Module: AWS::Flow::Core
- Defined in:
- lib/aws/flow/fiber.rb,
lib/aws/flow/tasks.rb,
lib/aws/flow/future.rb,
lib/aws/flow/flow_utils.rb,
lib/aws/flow/simple_dfa.rb,
lib/aws/flow/async_scope.rb,
lib/aws/flow/implementation.rb,
lib/aws/flow/async_backtrace.rb,
lib/aws/flow/begin_rescue_ensure.rb
Defined Under Namespace
Modules: SimpleDFA Classes: AlreadySetException, AsyncBacktrace, AsyncEventLoop, AsyncScope, BeginRescueEnsure, BeginRescueEnsureWrapper, CancellationException, DaemonBeginRescueEnsure, DaemonTask, ExternalConditionVariable, ExternalFuture, ExternalTask, ExternalTaskCompletionHandle, FiberConditionVariable, FlowFiber, Future, IllegalStateException, NoContextException, RootAsyncScope, Task, TaskContext
Instance Method Summary collapse
- #_error_handler(&block) ⇒ Object private
-
#daemon_task(&block) ⇒ Future
The tasks result, which is a Future.
-
#error_handler(&block) ⇒ Object
Creates a new error handler for asynchronous tasks.
- #external_task(&block) ⇒ nil
- #gate_by_version(version, method, &block) ⇒ Object private
- #make_backtrace(parent_backtrace) ⇒ Object private
-
#task(future = nil, &block) ⇒ Future
The tasks result, which is a Future.
-
#timed_wait_for_all(timeout, *futures) ⇒ Array<Future>
Blocks until all of the arguments are set or until timeout expires.
-
#timed_wait_for_any(timeout, *futures) ⇒ Array<Future>
Blocks until any of the arguments are set.
-
#timed_wait_for_function(timeout, function, *futures) ⇒ Array<Future>
Waits for the passed-in function to complete, setting values for the provided futures when it does.
-
#wait_for_all(*futures) ⇒ Array<Future>
Blocks until all of the arguments are set.
-
#wait_for_any(*futures) ⇒ Array<Future>
Blocks until any of the arguments are set.
-
#wait_for_function(function, *futures) ⇒ Array<Future>
Waits for the passed-in function to complete, setting values for the provided futures when it does.
-
#wait_for_function_helper(timeout, function, *futures) ⇒ Object
Helper method to refactor away the common implementation of wait_for_function and timed_wait_for_function.
Instance Method Details
#_error_handler(&block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
110 111 112 |
# File 'lib/aws/flow/implementation.rb', line 110 def _error_handler(&block) error_handler(&block).result end |
#daemon_task(&block) ⇒ Future
Returns The tasks result, which is a Future.
56 57 58 59 60 61 62 63 64 |
# File 'lib/aws/flow/implementation.rb', line 56 def daemon_task(&block) fiber = ::Fiber.current raise NoContextException unless fiber.respond_to? :__context__ context = fiber.__context__ t = DaemonTask.new(nil, &block) task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t) context << t t.result end |
#error_handler(&block) ⇒ Object
Creates a new error handler for asynchronous tasks.
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/aws/flow/implementation.rb', line 97 def error_handler(&block) fiber = ::Fiber.current raise NoContextException unless fiber.respond_to? :__context__ context = fiber.__context__ begin_rescue_ensure = BeginRescueEnsure.new(:parent => context.get_closest_containing_scope) bge = BeginRescueEnsureWrapper.new(block, begin_rescue_ensure) context << bge context << begin_rescue_ensure begin_rescue_ensure end |
#external_task(&block) ⇒ nil
75 76 77 78 79 80 81 82 83 |
# File 'lib/aws/flow/implementation.rb', line 75 def external_task(&block) fiber = ::Fiber.current raise NoContextException unless fiber.respond_to? :__context__ context = fiber.__context__ t = ExternalTask.new(:parent => context.get_closest_containing_scope, &block) task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t) context << t nil end |
#gate_by_version(version, method, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
23 24 25 26 27 |
# File 'lib/aws/flow/async_scope.rb', line 23 def gate_by_version(version, method, &block) if RUBY_VERSION.send(method, version) block.call end end |
#make_backtrace(parent_backtrace) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/aws/flow/flow_utils.rb', line 33 def make_backtrace(parent_backtrace) # 1 frame for the function that actually removes the stack traces. # 1 frame for the function that calls into the function that removes # frames in AsyncBacktrace. # 1 frame for the call into this function. # 1 frame for the initialize call of the BeginRescueEnsure or ExternalTask. # 1 frame for the new call into the BeginRescueEnsure or ExternalTask. # 1 frame for the AsyncScope initialize that the BeginRescueEnsure/ExternalTask has to be in. # "./lib/aws/rubyflow/asyncBacktrace.rb:75:in `caller'" # "./lib/aws/rubyflow/asyncBacktrace.rb:21:in `create'" # "./lib/aws/rubyflow/flow.rb:16:in `make_backtrace'" # "./lib/aws/rubyflow/flow.rb:103:in `initialize'" # "./lib/aws/rubyflow/asyncScope.rb:17:in `new'" # "./lib/aws/rubyflow/asyncScope.rb:17:in `initialize'" frames_to_skip = 7 backtrace = AsyncBacktrace.create(parent_backtrace, frames_to_skip) end |
#task(future = nil, &block) ⇒ Future
Returns The tasks result, which is a Future.
36 37 38 39 40 41 42 43 44 |
# File 'lib/aws/flow/implementation.rb', line 36 def task(future = nil, &block) fiber = ::Fiber.current raise NoContextException unless fiber.respond_to? :__context__ context = fiber.__context__ t = Task.new(nil, &block) task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t) context << t t.result end |
#timed_wait_for_all(timeout, *futures) ⇒ Array<Future>
Blocks until all of the arguments are set or until timeout expires
180 181 182 |
# File 'lib/aws/flow/implementation.rb', line 180 def timed_wait_for_all(timeout, *futures) timed_wait_for_function(timeout, lambda {|result, future_list| result.size == future_list.size}, futures) end |
#timed_wait_for_any(timeout, *futures) ⇒ Array<Future>
Blocks until any of the arguments are set.
165 166 167 |
# File 'lib/aws/flow/implementation.rb', line 165 def timed_wait_for_any(timeout, *futures) timed_wait_for_function(timeout, lambda {|result, future_list| result.length >= 1 }, futures) end |
#timed_wait_for_function(timeout, function, *futures) ⇒ Array<Future>
Waits for the passed-in function to complete, setting values for the provided futures when it does.
198 199 200 |
# File 'lib/aws/flow/implementation.rb', line 198 def timed_wait_for_function(timeout, function, *futures) wait_for_function_helper(timeout, function, *futures) end |
#wait_for_all(*futures) ⇒ Array<Future>
Blocks until all of the arguments are set.
150 151 152 |
# File 'lib/aws/flow/implementation.rb', line 150 def wait_for_all(*futures) wait_for_function(lambda {|result, future_list| result.size == future_list.size}, futures) end |
#wait_for_any(*futures) ⇒ Array<Future>
Blocks until any of the arguments are set.
138 139 140 |
# File 'lib/aws/flow/implementation.rb', line 138 def wait_for_any(*futures) wait_for_function(lambda {|result, future_list| result.length >= 1 }, futures) end |
#wait_for_function(function, *futures) ⇒ Array<Future>
Waits for the passed-in function to complete, setting values for the provided futures when it does.
126 127 128 |
# File 'lib/aws/flow/implementation.rb', line 126 def wait_for_function(function, *futures) wait_for_function_helper(nil, function, *futures) end |
#wait_for_function_helper(timeout, function, *futures) ⇒ Object
Helper method to refactor away the common implementation of wait_for_function and timed_wait_for_function.
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/aws/flow/implementation.rb', line 204 def wait_for_function_helper(timeout, function, *futures) futures.flatten! f = futures.select { |x| x.is_a?(ExternalFuture) } if f.size > 0 && f.size != futures.size raise ArgumentError, "The futures array must contain either all "\ "objects of Future or all objects of ExternalFuture" end conditional = f.size == 0 ? FiberConditionVariable.new : ExternalConditionVariable.new return nil if futures.empty? result = futures.select(&:set?) return futures.find(&:set?)if function.call(result, futures) futures.each do |f| f.on_set do |set_one| result << set_one conditional.broadcast if function.call(result, futures) end end if conditional.is_a?(FiberConditionVariable) conditional.wait else conditional.wait(timeout) raise Timeout::Error.new unless function.call(result, futures) end result end |