Module: AWS::Flow::Core

Defined in:
lib/aws/flow/fiber.rb,
lib/aws/flow/tasks.rb,
lib/aws/flow/future.rb,
lib/aws/flow/flow_utils.rb,
lib/aws/flow/simple_dfa.rb,
lib/aws/flow/async_scope.rb,
lib/aws/flow/implementation.rb,
lib/aws/flow/async_backtrace.rb,
lib/aws/flow/begin_rescue_ensure.rb

Defined Under Namespace

Modules: SimpleDFA Classes: AlreadySetException, AsyncBacktrace, AsyncEventLoop, AsyncScope, BeginRescueEnsure, BeginRescueEnsureWrapper, CancellationException, DaemonBeginRescueEnsure, DaemonTask, ExternalConditionVariable, ExternalFuture, ExternalTask, ExternalTaskCompletionHandle, FiberConditionVariable, FlowFiber, Future, IllegalStateException, NoContextException, RootAsyncScope, Task, TaskContext

Instance Method Summary collapse

Instance Method Details

#_error_handler(&block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



110
111
112
# File 'lib/aws/flow/implementation.rb', line 110

def _error_handler(&block)
  error_handler(&block).result
end

#daemon_task(&block) ⇒ Future

Returns The tasks result, which is a Future.

Parameters:

  • block

    The block of code to be executed when the daemon task is run.

Returns:

Raises:

  • (NoContextException)

    If the current fiber does not respond to ‘Fiber.__context__`.



56
57
58
59
60
61
62
63
64
# File 'lib/aws/flow/implementation.rb', line 56

def daemon_task(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = DaemonTask.new(nil, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  t.result
end

#error_handler(&block) ⇒ Object

Creates a new error handler for asynchronous tasks.

Parameters:

Returns:

  • The result of the ‘begin` statement if there is no error; otherwise the value of the `return` statement.

Raises:

  • (NoContextException)

    If the current fiber does not respond to ‘Fiber.__context__`.



97
98
99
100
101
102
103
104
105
106
# File 'lib/aws/flow/implementation.rb', line 97

def error_handler(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  begin_rescue_ensure = BeginRescueEnsure.new(:parent => context.get_closest_containing_scope)
  bge = BeginRescueEnsureWrapper.new(block, begin_rescue_ensure)
  context << bge
  context << begin_rescue_ensure
  begin_rescue_ensure
end

#external_task(&block) ⇒ nil

Parameters:

  • block

    The block of code to be executed when the external task is run.

Returns:

  • (nil)

Raises:

  • (NoContextException)

    If the current fiber does not respond to ‘Fiber.__context__`.



75
76
77
78
79
80
81
82
83
# File 'lib/aws/flow/implementation.rb', line 75

def external_task(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = ExternalTask.new(:parent => context.get_closest_containing_scope, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  nil
end

#gate_by_version(version, method, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



23
24
25
26
27
# File 'lib/aws/flow/async_scope.rb', line 23

def gate_by_version(version, method, &block)
  if RUBY_VERSION.send(method, version)
    block.call
  end
end

#make_backtrace(parent_backtrace) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/aws/flow/flow_utils.rb', line 33

def make_backtrace(parent_backtrace)
  # 1 frame for the function that actually removes the stack traces.
  # 1 frame for the function that calls into the function that removes
  # frames in AsyncBacktrace.
  # 1 frame for the call into this function.
  # 1 frame for the initialize call of the BeginRescueEnsure or ExternalTask.
  # 1 frame for the new call into the BeginRescueEnsure or ExternalTask.
  # 1 frame for the AsyncScope initialize that the BeginRescueEnsure/ExternalTask has to be in.

  # "./lib/aws/rubyflow/asyncBacktrace.rb:75:in `caller'"
  # "./lib/aws/rubyflow/asyncBacktrace.rb:21:in `create'"
  # "./lib/aws/rubyflow/flow.rb:16:in `make_backtrace'"
  # "./lib/aws/rubyflow/flow.rb:103:in `initialize'"
  # "./lib/aws/rubyflow/asyncScope.rb:17:in `new'"
  # "./lib/aws/rubyflow/asyncScope.rb:17:in `initialize'"

  frames_to_skip = 7
  backtrace = AsyncBacktrace.create(parent_backtrace, frames_to_skip)
end

#task(future = nil, &block) ⇒ Future

Returns The tasks result, which is a Future.

Parameters:

  • future (Future) (defaults to: nil)

    Unused; defaults to nil.

  • block

    The block of code to be executed when the task is run.

Returns:

Raises:

  • (NoContextException)

    If the current fiber does not respond to ‘Fiber.__context__`.



36
37
38
39
40
41
42
43
44
# File 'lib/aws/flow/implementation.rb', line 36

def task(future = nil, &block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = Task.new(nil, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  t.result
end

#timed_wait_for_all(timeout, *futures) ⇒ Array<Future>

Blocks until all of the arguments are set or until timeout expires

Parameters:

  • futures (Array<Future>)

    A list of futures to wait for. The function will return only when all of them are set.

  • timeout (Integer)

    The timeout value after which it will raise a Timeout::Error

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



180
181
182
# File 'lib/aws/flow/implementation.rb', line 180

def timed_wait_for_all(timeout, *futures)
  timed_wait_for_function(timeout, lambda {|result, future_list| result.size == future_list.size}, futures)
end

#timed_wait_for_any(timeout, *futures) ⇒ Array<Future>

Blocks until any of the arguments are set.

Parameters:

  • futures (Array<Future>)

    A list of futures to wait for. The function will return when at least one of these is set.

  • timeout (Integer)

    The timeout value after which it will raise a Timeout::Error

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



165
166
167
# File 'lib/aws/flow/implementation.rb', line 165

def timed_wait_for_any(timeout, *futures)
  timed_wait_for_function(timeout, lambda {|result, future_list| result.length >= 1 }, futures)
end

#timed_wait_for_function(timeout, function, *futures) ⇒ Array<Future>

Waits for the passed-in function to complete, setting values for the provided futures when it does.

Parameters:

  • function

    The function to wait for.

  • futures (Array<Future>)

    A list of futures to provide values for when the function completes.

  • timeout (Integer)

    The timeout value after which it will raise a Timeout::Error

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



198
199
200
# File 'lib/aws/flow/implementation.rb', line 198

def timed_wait_for_function(timeout, function, *futures)
  wait_for_function_helper(timeout, function, *futures)
end

#wait_for_all(*futures) ⇒ Array<Future>

Blocks until all of the arguments are set.

Parameters:

  • futures (Array<Future>)

    A list of futures to wait for. The function will return only when all of them are set.

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



150
151
152
# File 'lib/aws/flow/implementation.rb', line 150

def wait_for_all(*futures)
  wait_for_function(lambda {|result, future_list| result.size == future_list.size}, futures)
end

#wait_for_any(*futures) ⇒ Array<Future>

Blocks until any of the arguments are set.

Parameters:

  • futures (Array<Future>)

    A list of futures to wait for. The function will return when at least one of these is set.

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



138
139
140
# File 'lib/aws/flow/implementation.rb', line 138

def wait_for_any(*futures)
  wait_for_function(lambda {|result, future_list| result.length >= 1 }, futures)
end

#wait_for_function(function, *futures) ⇒ Array<Future>

Waits for the passed-in function to complete, setting values for the provided futures when it does.

Parameters:

  • function

    The function to wait for.

  • futures (Array<Future>)

    A list of futures to provide values for when the function completes.

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



126
127
128
# File 'lib/aws/flow/implementation.rb', line 126

def wait_for_function(function, *futures)
  wait_for_function_helper(nil, function, *futures)
end

#wait_for_function_helper(timeout, function, *futures) ⇒ Object

Helper method to refactor away the common implementation of wait_for_function and timed_wait_for_function.



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/aws/flow/implementation.rb', line 204

def wait_for_function_helper(timeout, function, *futures)
  futures.flatten!

  f = futures.select { |x| x.is_a?(ExternalFuture) }

  if f.size > 0 && f.size != futures.size
    raise ArgumentError, "The futures array must contain either all "\
      "objects of Future or all objects of ExternalFuture"
  end

  conditional = f.size == 0 ? FiberConditionVariable.new :
    ExternalConditionVariable.new

  return nil if futures.empty?
  result = futures.select(&:set?)
  return futures.find(&:set?)if function.call(result, futures)
  futures.each do |f|
    f.on_set do |set_one|
      result << set_one
      conditional.broadcast if function.call(result, futures)
    end
  end

  if conditional.is_a?(FiberConditionVariable)
    conditional.wait
  else
    conditional.wait(timeout)
    raise Timeout::Error.new unless function.call(result, futures)
  end
  result
end