Module: AWS::Flow::Core
- Defined in:
- lib/aws/flow/fiber.rb,
lib/aws/flow/tasks.rb,
lib/aws/flow/future.rb,
lib/aws/flow/flow_utils.rb,
lib/aws/flow/simple_dfa.rb,
lib/aws/flow/async_scope.rb,
lib/aws/flow/implementation.rb,
lib/aws/flow/async_backtrace.rb,
lib/aws/flow/begin_rescue_ensure.rb
Defined Under Namespace
Modules: SimpleDFA Classes: AlreadySetException, AsyncBacktrace, AsyncEventLoop, AsyncScope, BeginRescueEnsure, BeginRescueEnsureWrapper, CancellationException, DaemonBeginRescueEnsure, DaemonTask, ExternalTask, ExternalTaskCompletionHandle, FiberConditionVariable, FlowFiber, Future, IllegalStateException, NoContextException, RootAsyncScope, Task, TaskContext
Instance Method Summary collapse
- #_error_handler(&block) ⇒ Object private
-
#daemon_task(&block) ⇒ Future
The tasks result, which is a Future.
-
#error_handler(&block) ⇒ Object
Creates a new error handler for asynchronous tasks.
- #external_task(&block) ⇒ nil
- #gate_by_version(version, method, &block) ⇒ Object private
- #make_backtrace(parent_backtrace) ⇒ Object private
-
#task(future = nil, &block) ⇒ Future
The tasks result, which is a Future.
-
#wait_for_all(*futures) ⇒ Array<Future>
Blocks until all of the arguments are set.
-
#wait_for_any(*futures) ⇒ Array<Future>
Blocks until any of the arguments are set.
-
#wait_for_function(function, *futures) ⇒ Array<Future>
Waits for the passed-in function to complete, setting values for the provided futures when it does.
Instance Method Details
#_error_handler(&block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
110 111 112 |
# File 'lib/aws/flow/implementation.rb', line 110 def _error_handler(&block) error_handler(&block).result end |
#daemon_task(&block) ⇒ Future
Returns The tasks result, which is a Future.
56 57 58 59 60 61 62 63 64 |
# File 'lib/aws/flow/implementation.rb', line 56 def daemon_task(&block) fiber = ::Fiber.current raise NoContextException unless fiber.respond_to? :__context__ context = fiber.__context__ t = DaemonTask.new(nil, &block) task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t) context << t t.result end |
#error_handler(&block) ⇒ Object
Creates a new error handler for asynchronous tasks.
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/aws/flow/implementation.rb', line 97 def error_handler(&block) fiber = ::Fiber.current raise NoContextException unless fiber.respond_to? :__context__ context = fiber.__context__ begin_rescue_ensure = BeginRescueEnsure.new(:parent => context.get_closest_containing_scope) bge = BeginRescueEnsureWrapper.new(block, begin_rescue_ensure) context << bge context << begin_rescue_ensure begin_rescue_ensure end |
#external_task(&block) ⇒ nil
75 76 77 78 79 80 81 82 83 |
# File 'lib/aws/flow/implementation.rb', line 75 def external_task(&block) fiber = ::Fiber.current raise NoContextException unless fiber.respond_to? :__context__ context = fiber.__context__ t = ExternalTask.new(:parent => context.get_closest_containing_scope, &block) task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t) context << t nil end |
#gate_by_version(version, method, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
23 24 25 26 27 |
# File 'lib/aws/flow/async_scope.rb', line 23 def gate_by_version(version, method, &block) if RUBY_VERSION.send(method, version) block.call end end |
#make_backtrace(parent_backtrace) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/aws/flow/flow_utils.rb', line 32 def make_backtrace(parent_backtrace) # 1 frame for the function that actually removes the stack traces. # 1 frame for the function that calls into the function that removes # frames in AsyncBacktrace. # 1 frame for the call into this function. # 1 frame for the initialize call of the BeginRescueEnsure or ExternalTask. # 1 frame for the new call into the BeginRescueEnsure or ExternalTask. # 1 frame for the AsyncScope initialize that the BeginRescueEnsure/ExternalTask has to be in. # "./lib/aws/rubyflow/asyncBacktrace.rb:75:in `caller'" # "./lib/aws/rubyflow/asyncBacktrace.rb:21:in `create'" # "./lib/aws/rubyflow/flow.rb:16:in `make_backtrace'" # "./lib/aws/rubyflow/flow.rb:103:in `initialize'" # "./lib/aws/rubyflow/asyncScope.rb:17:in `new'" # "./lib/aws/rubyflow/asyncScope.rb:17:in `initialize'" frames_to_skip = 7 backtrace = AsyncBacktrace.create(parent_backtrace, frames_to_skip) end |
#task(future = nil, &block) ⇒ Future
Returns The tasks result, which is a Future.
36 37 38 39 40 41 42 43 44 |
# File 'lib/aws/flow/implementation.rb', line 36 def task(future = nil, &block) fiber = ::Fiber.current raise NoContextException unless fiber.respond_to? :__context__ context = fiber.__context__ t = Task.new(nil, &block) task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t) context << t t.result end |
#wait_for_all(*futures) ⇒ Array<Future>
Blocks until all of the arguments are set.
162 163 164 |
# File 'lib/aws/flow/implementation.rb', line 162 def wait_for_all(*futures) wait_for_function(lambda {|result, future_list| result.size == future_list.size}, futures) end |
#wait_for_any(*futures) ⇒ Array<Future>
Blocks until any of the arguments are set.
150 151 152 |
# File 'lib/aws/flow/implementation.rb', line 150 def wait_for_any(*futures) wait_for_function(lambda {|result, future_list| result.length >= 1 }, futures) end |
#wait_for_function(function, *futures) ⇒ Array<Future>
Waits for the passed-in function to complete, setting values for the provided futures when it does.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/aws/flow/implementation.rb', line 126 def wait_for_function(function, *futures) conditional = FiberConditionVariable.new futures.flatten! return nil if futures.empty? result = futures.select(&:set?) return futures.find(&:set?)if function.call(result, futures) futures.each do |f| f.on_set do |set_one| result << set_one conditional.broadcast if function.call(result, futures) end end conditional.wait result end |