Module: AWS::Flow::Core

Defined in:
lib/aws/flow/fiber.rb,
lib/aws/flow/tasks.rb,
lib/aws/flow/future.rb,
lib/aws/flow/flow_utils.rb,
lib/aws/flow/simple_dfa.rb,
lib/aws/flow/async_scope.rb,
lib/aws/flow/implementation.rb,
lib/aws/flow/async_backtrace.rb,
lib/aws/flow/begin_rescue_ensure.rb

Defined Under Namespace

Modules: SimpleDFA Classes: AlreadySetException, AsyncBacktrace, AsyncEventLoop, AsyncScope, BeginRescueEnsure, BeginRescueEnsureWrapper, CancellationException, DaemonBeginRescueEnsure, DaemonTask, ExternalTask, ExternalTaskCompletionHandle, FiberConditionVariable, FlowFiber, Future, IllegalStateException, NoContextException, RootAsyncScope, Task, TaskContext

Instance Method Summary collapse

Instance Method Details

#_error_handler(&block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



110
111
112
# File 'lib/aws/flow/implementation.rb', line 110

def _error_handler(&block)
  error_handler(&block).result
end

#daemon_task(&block) ⇒ Future

Returns The tasks result, which is a Future.

Parameters:

  • block

    The block of code to be executed when the daemon task is run.

Returns:

Raises:

  • (NoContextException)

    If the current fiber does not respond to ‘Fiber.__context__`.



56
57
58
59
60
61
62
63
64
# File 'lib/aws/flow/implementation.rb', line 56

def daemon_task(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = DaemonTask.new(nil, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  t.result
end

#error_handler(&block) ⇒ Object

Creates a new error handler for asynchronous tasks.

Parameters:

Returns:

  • The result of the ‘begin` statement if there is no error; otherwise the value of the `return` statement.

Raises:

  • (NoContextException)

    If the current fiber does not respond to ‘Fiber.__context__`.



97
98
99
100
101
102
103
104
105
106
# File 'lib/aws/flow/implementation.rb', line 97

def error_handler(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  begin_rescue_ensure = BeginRescueEnsure.new(:parent => context.get_closest_containing_scope)
  bge = BeginRescueEnsureWrapper.new(block, begin_rescue_ensure)
  context << bge
  context << begin_rescue_ensure
  begin_rescue_ensure
end

#external_task(&block) ⇒ nil

Parameters:

  • block

    The block of code to be executed when the external task is run.

Returns:

  • (nil)

Raises:

  • (NoContextException)

    If the current fiber does not respond to ‘Fiber.__context__`.



75
76
77
78
79
80
81
82
83
# File 'lib/aws/flow/implementation.rb', line 75

def external_task(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = ExternalTask.new(:parent => context.get_closest_containing_scope, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  nil
end

#gate_by_version(version, method, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



23
24
25
26
27
# File 'lib/aws/flow/async_scope.rb', line 23

def gate_by_version(version, method, &block)
  if RUBY_VERSION.send(method, version)
    block.call
  end
end

#make_backtrace(parent_backtrace) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/aws/flow/flow_utils.rb', line 32

def make_backtrace(parent_backtrace)
  # 1 frame for the function that actually removes the stack traces.
  # 1 frame for the function that calls into the function that removes
  # frames in AsyncBacktrace.
  # 1 frame for the call into this function.
  # 1 frame for the initialize call of the BeginRescueEnsure or ExternalTask.
  # 1 frame for the new call into the BeginRescueEnsure or ExternalTask.
  # 1 frame for the AsyncScope initialize that the BeginRescueEnsure/ExternalTask has to be in.

  # "./lib/aws/rubyflow/asyncBacktrace.rb:75:in `caller'"
  # "./lib/aws/rubyflow/asyncBacktrace.rb:21:in `create'"
  # "./lib/aws/rubyflow/flow.rb:16:in `make_backtrace'"
  # "./lib/aws/rubyflow/flow.rb:103:in `initialize'"
  # "./lib/aws/rubyflow/asyncScope.rb:17:in `new'"
  # "./lib/aws/rubyflow/asyncScope.rb:17:in `initialize'"

  frames_to_skip = 7
  backtrace = AsyncBacktrace.create(parent_backtrace, frames_to_skip)
end

#task(future = nil, &block) ⇒ Future

Returns The tasks result, which is a Future.

Parameters:

  • future (Future) (defaults to: nil)

    Unused; defaults to nil.

  • block

    The block of code to be executed when the task is run.

Returns:

Raises:

  • (NoContextException)

    If the current fiber does not respond to ‘Fiber.__context__`.



36
37
38
39
40
41
42
43
44
# File 'lib/aws/flow/implementation.rb', line 36

def task(future = nil, &block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = Task.new(nil, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  t.result
end

#wait_for_all(*futures) ⇒ Array<Future>

Blocks until all of the arguments are set.

Parameters:

  • futures (Array<Future>)

    A list of futures to wait for. The function will return only when all of them are set.

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



162
163
164
# File 'lib/aws/flow/implementation.rb', line 162

def wait_for_all(*futures)
  wait_for_function(lambda {|result, future_list| result.size == future_list.size}, futures)
end

#wait_for_any(*futures) ⇒ Array<Future>

Blocks until any of the arguments are set.

Parameters:

  • futures (Array<Future>)

    A list of futures to wait for. The function will return when at least one of these is set.

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



150
151
152
# File 'lib/aws/flow/implementation.rb', line 150

def wait_for_any(*futures)
  wait_for_function(lambda {|result, future_list| result.length >= 1 }, futures)
end

#wait_for_function(function, *futures) ⇒ Array<Future>

Waits for the passed-in function to complete, setting values for the provided futures when it does.

Parameters:

  • function

    The function to wait for.

  • futures (Array<Future>)

    A list of futures to provide values for when the function completes.

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/aws/flow/implementation.rb', line 126

def wait_for_function(function, *futures)
  conditional = FiberConditionVariable.new
  futures.flatten!
  return nil if futures.empty?
  result = futures.select(&:set?)
  return futures.find(&:set?)if function.call(result, futures)
  futures.each do |f|
    f.on_set do |set_one|
      result << set_one
      conditional.broadcast if function.call(result, futures)
    end
  end
  conditional.wait
  result
end