Module: AWS::Flow

Included in:
Replayer, Runner
Defined in:
lib/aws/utils.rb,
lib/aws/runner.rb,
lib/aws/replayer.rb,
lib/aws/flow/fiber.rb,
lib/aws/flow/tasks.rb,
lib/aws/flow/future.rb,
lib/aws/decider/worker.rb,
lib/aws/templates/base.rb,
lib/aws/decider/decider.rb,
lib/aws/decider/options.rb,
lib/aws/decider/starter.rb,
lib/aws/decider/version.rb,
lib/aws/flow/flow_utils.rb,
lib/aws/flow/simple_dfa.rb,
lib/aws/decider/activity.rb,
lib/aws/decider/executor.rb,
lib/aws/flow/async_scope.rb,
lib/aws/templates/result.rb,
lib/aws/decider/utilities.rb,
lib/aws/templates/default.rb,
lib/aws/templates/starter.rb,
lib/aws/decider/exceptions.rb,
lib/aws/templates/activity.rb,
lib/aws/decider/task_poller.rb,
lib/aws/flow/implementation.rb,
lib/aws/templates/utilities.rb,
lib/aws/decider/task_handler.rb,
lib/aws/flow/async_backtrace.rb,
lib/aws/decider/async_decider.rb,
lib/aws/decider/flow_defaults.rb,
lib/aws/decider/data_converter.rb,
lib/aws/decider/generic_client.rb,
lib/aws/decider/history_helper.rb,
lib/aws/decider/implementation.rb,
lib/aws/decider/state_machines.rb,
lib/aws/decider/workflow_clock.rb,
lib/aws/decider/workflow_client.rb,
lib/aws/decider/decision_context.rb,
lib/aws/decider/workflow_enabled.rb,
lib/aws/flow/begin_rescue_ensure.rb,
lib/aws/decider/activity_definition.rb,
lib/aws/decider/workflow_definition.rb,
lib/aws/decider/async_retrying_executor.rb,
lib/aws/decider/workflow_definition_factory.rb

Defined Under Namespace

Modules: Activities, Activity, Core, Decider, DecisionStateMachineDFA, GenericTypeModule, Replayer, Runner, Templates, Utilities, Utils, Workflows Classes: ActivityClient, ActivityDecisionStateMachine, ActivityDefaults, ActivityDefinition, ActivityExecutionContext, ActivityFailureException, ActivityMetadata, ActivityOptions, ActivityRegistrationDefaults, ActivityRegistrationOptions, ActivityRuntimeOptions, ActivityTaskFailedException, ActivityTaskPoller, ActivityTaskTimedOutException, ActivityType, ActivityWorker, AsyncDecider, AsyncRetryingExecutor, ChildWorkflowDecisionStateMachine, ChildWorkflowException, ChildWorkflowFailedException, ChildWorkflowTerminatedException, ChildWorkflowTimedOutException, CompleteWorkflowStateMachine, ContinueAsNewOptions, DecisionContext, DecisionException, DecisionHelper, DecisionID, DecisionStateMachineBase, DecisionTaskHandler, DecisionWrapper, Defaults, EventsIterator, ExponentialRetryOptions, FailWorkflowExecutionException, FlowConstants, FlowException, ForkingExecutor, GenericActivityClient, GenericClient, GenericType, GenericWorker, GenericWorkflowClient, HistoryHelper, LogMock, MethodPair, MinimalDomain, MinimalWorkflowExecution, NoInput, OpenRequestInfo, Options, RejectedExecutionException, RetryDefaults, RetryOptions, RetryPolicy, S3DataConverter, ScheduleActivityTaskFailedException, SignalDecisionStateMachine, SignalExternalWorkflowException, SignalWorkflowOptions, SingleDecisionData, SingleDecisionIterator, StartChildWorkflowFailedException, StartTimerFailedException, StartWorkflowOptions, SuspendableSemaphore, TimerDecisionStateMachine, WorkerDefaults, WorkerOptions, WorkflowClient, WorkflowClock, WorkflowContext, WorkflowDefaults, WorkflowDefinition, WorkflowDefinitionFactory, WorkflowException, WorkflowFactory, WorkflowFuture, WorkflowOptions, WorkflowRegistrationDefaults, WorkflowRegistrationOptions, WorkflowTaskPoller, WorkflowType, WorkflowWorker, YAMLDataConverter

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.decision_contextObject



62
63
64
# File 'lib/aws/decider/implementation.rb', line 62

def decision_context
  FlowFiber.current[:decision_context]
end

.on_windows?Boolean

Returns:

  • (Boolean)


21
22
23
24
# File 'lib/aws/decider/utilities.rb', line 21

def self.on_windows?
  require 'rbconfig'
  (RbConfig::CONFIG['host_os'] =~ /mswin|mingw/).nil? == false
end

.start(name_or_klass, input, options = {}) ⇒ Object

Starts an Activity or a Workflow Template execution using the default workflow class FlowDefaultWorkflowRuby

Usage -

AWS::Flow::start("<ActivityClassName>.<method_name>", <input_hash>,
<options_hash> )

Example -

1) Start an activity execution -

AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" })

2) Start an activity execution with overriden options -

AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" }, {
  exponential_retry: { maximum_attempts: 10 } }
)

Parameters:

  • name_or_klass (String or AWS::Flow::Templates::TemplateBase)

    The Activity or the Workflow Template that needs to be scheduled via the default workflow. This argument can either be a string that represents a fully qualified activity name - <ActivityClass>.<method_name> or it can be an instance of AWS::Flow::Templates::TemplateBase

  • input (Hash)

    Input hash for the workflow execution

  • opts (Hash)

    Additional options to configure the workflow or activity execution.



200
201
202
# File 'lib/aws/decider/starter.rb', line 200

def self.start(name_or_klass, input, options = {})
  AWS::Flow::Templates::Starter.start(name_or_klass, input, options)
end

.start_workflow(workflow = nil, input, opts) ⇒ Object

Utility method used to start a workflow execution with the service.

Usage -

1) Passing a fully qualified workflow <prefix_name>.<execution_method> name -

AWS::Flow::start_workflow("HelloWorkflow.say_hello", "world", {
  domain: "FooDomain",
  version: "1.0"
  ...
})

2) Passing workflow class name with other details in the options hash -

AWS::Flow::start_workflow("HelloWorkflow", "world", {
  domain: "FooDomain",
  execution_method: "say_hello",
  version: "1.0"
  ...
})

3) Acquiring options using the :from_class option -

AWS::Flow::start_workflow(nil, "hello", {
  domain: "FooDomain",
  from_class: "HelloWorkflow"
})

# This will take all the required options from the HelloWorkflow class.
# If execution_method options is not passed in, it will use the first
# workflow method in the class.

4) All workflow options are present in the options hash. This is the case

when this method is called by AWS::Flow#start

AWS::Flow::start_workflow(nil, "hello", {
  domain: "FooDomain",
  prefix_name: "HelloWorkflow",
  execution_method: "say_hello",
  version: "1.0",
  ...
})

Parameters:

  • workflow (String or Class (that extends AWS::Flow::Workflows)) (defaults to: nil)

    Represents an AWS Flow Framework workflow class. If not provided, details of the workflow must be passed via the opts Hash.

  • input (Hash)

    Input hash for the workflow execution

  • opts (Hash)

    Hash of options to configure the workflow execution

Options Hash (opts):

  • *Required* (String)

    :domain

  • *Required* (String)

    :version

  • *Optional* (String)

    :prefix_name Must be specified if workflow is not passed in as an argument

  • *Optional* (String)

    :execution_method Must be specified if workflow is not passed in as an argument

  • *Optional* (String)

    :from_class

  • *Optional* (String)

    :workflow_id

  • *Optional* (Integer)

    :execution_start_to_close_timeout

  • *Optional* (Integer)

    :task_start_to_close_timeout

  • *Optional* (Integer)

    :task_priority

  • *Optional* (String)

    :task_list

  • *Optional* (String)

    :child_policy

  • *Optional* (Array)

    :tag_list

  • *Optional* (Object)

    :data_converter

Raises:

  • (ArgumentError)


84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/aws/decider/starter.rb', line 84

def self.start_workflow(workflow = nil, input, opts)

  raise ArgumentError, "Please provide an options hash" if opts.nil? || !opts.is_a?(Hash)

  options = opts.dup

  # Get the domain out of the options hash.
  domain = options.delete(:domain)

  raise ArgumentError, "You must provide a :domain in the options hash" if domain.nil?

  if options[:from_class]
    # Do nothing. Use options as they are. They will be taken care of in the
    # workflow client
  elsif workflow.nil?
    # This block is usually executed when #start_workflow is called from
    # #start. All options required to start the workflow must be present
    # in the options hash.
    prefix_name = options[:prefix_name] || options[:workflow_name]
    # Check if required options are present
    raise ArgumentError, "You must provide a :prefix_name in the options hash" unless prefix_name
    raise ArgumentError, "You must provide an :execution_method in the options hash" unless options[:execution_method]
    raise ArgumentError, "You must provide a :version in the options hash" unless options[:version]
  else
    # When a workflow class name is given along with some options

    # If a fully qualified workflow name is given, split it into prefix_name
    # and execution_method
    prefix_name, execution_method = workflow.to_s.split(".")
    # If a fully qualified name is not given, then look for it in the options
    # hash
    execution_method ||= options[:execution_method]

    # Make sure all required options are present
    raise ArgumentError, "You must provide an :execution_method in the options hash" unless execution_method
    raise ArgumentError, "You must provide a :version in the options hash" unless options[:version]

    # Set the :prefix_name and :execution_method options correctly
    options.merge!(
      prefix_name: prefix_name,
      execution_method: execution_method,
    )
  end

  swf = AWS::SimpleWorkflow.new
  domain = swf.domains[domain]

  # Get a workflow client for the domain
  client = workflow_client(domain.client, domain) { options }

  # Start the workflow execution
  client.start_execution(input)
end

.versionObject



18
19
20
# File 'lib/aws/decider/version.rb', line 18

def self.version
  "3.1.0"
end

.with_retry(options = {}, &block) ⇒ Object

Execute a block with retries within a workflow context.

Parameters:

  • options (defaults to: {})

    The RetryOptions to use.

  • block

    The block to execute.



52
53
54
55
56
57
58
59
60
# File 'lib/aws/decider/implementation.rb', line 52

def with_retry(options = {}, &block)
  # TODO raise a specific error instead of a runtime error
  raise "with_retry can only be used inside a workflow context!" if Utilities::is_external
  retry_options = ExponentialRetryOptions.new(options)
  retry_policy = RetryPolicy.new(retry_options.retry_function, retry_options)
  async_retrying_executor = AsyncRetryingExecutor.new(retry_policy, self.decision_context.workflow_clock, retry_options.return_on_start)
  future = async_retrying_executor.execute(lambda { block.call })
  Utilities::drill_on_future(future) unless retry_options.return_on_start
end

.workflow_client(service = nil, domain = nil, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/aws/decider/implementation.rb', line 71

def self.workflow_client(service = nil, domain = nil, &block)
  options = Utilities::interpret_block_for_options(StartWorkflowOptions, block)
  if ! Utilities::is_external
    service = AWS::SimpleWorkflow.new
    # So, we probably shouldn't be doing this, but we need to slightly
    # redesign where this is available from.
    domain = FlowFiber.current[:decision_context].workflow_context.decision_task.workflow_execution.domain
  else
    if service.nil? || domain.nil?
      raise "You must provide both a service and domain when using workflow client in an external setting"
    end
  end

  workflow_class_name = options.from_class || options.workflow_name
  workflow_class = get_const(workflow_class_name) rescue nil
  WorkflowClient.new(service, domain, workflow_class, options)
end

Instance Method Details

#workflow_client(service = nil, domain = nil, &block) ⇒ Object

Creates a new WorkflowClient instance.

Parameters:

  • service (defaults to: nil)

    An Amazon SWF service reference. This is usually created with:

    swf = AWS::SimpleWorkflow.new
    
  • domain (defaults to: nil)

    The Amazon SWF Domain to use for this workflow client. This is usually created on the service object, such as:

    domain = swf.domains.create('my-domain', 10)
    

    or retrieved from it (for existing domains):

    domain = swf.domains['my-domain']
    
  • block (Hash, StartWorkflowOptions)

    A hash of options to start the workflow.



40
41
42
# File 'lib/aws/decider/implementation.rb', line 40

def workflow_client(service = nil, domain = nil, &block)
  AWS::Flow.send(:workflow_client, service, domain, &block)
end

#workflow_factory(client, domain, &options) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method is for internal use only and may be changed or removed

without prior notice.  Use {#workflow_client} instead.


21
22
23
# File 'lib/aws/decider/workflow_enabled.rb', line 21

def workflow_factory(client, domain, &options)
  WorkflowFactory.new(client, domain,  options)
end