Module: AWS::Flow::Templates Private

Defined in:
lib/aws/templates/starter.rb,
lib/aws/templates/base.rb,
lib/aws/templates/default.rb,
lib/aws/templates/activity.rb

This module is part of a private API. You should avoid using this module if possible, as it may be removed or be changed in the future.

Defined Under Namespace

Modules: ActivityProxies Classes: ActivityTemplate, FlowDefaultResultActivityRuby, FlowDefaultWorkflowRuby, RootTemplate, TemplateBase

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.activity(name, opts = {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Initializes an activity template

Parameters:

  • name (String)
  • options (Hash)


63
64
65
# File 'lib/aws/templates/activity.rb', line 63

def self.activity(name, opts = {})
  ActivityTemplate.new(name, opts)
end

.default_workflowObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the default workflow class



140
141
142
# File 'lib/aws/templates/default.rb', line 140

def self.default_workflow
  return AWS::Flow::Templates.const_get(FlowConstants.defaults[:prefix_name])
end

.get_result(tasklist, domain, timeout = nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Gets the result of the workflow execution by starting an ActivityWorker on the FlowDefaultResultActivityRuby class. The result activity will set the instance variable future :result with the result of the template. It will block till either the result future is set or till the timeout expires - whichever comes first.



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/aws/templates/starter.rb', line 176

def self.get_result(tasklist, domain, timeout=nil)

  swf = AWS::SimpleWorkflow.new
  domain = swf.domains[domain]

  # Create a new instance of the FlowDefaultResultActivityRuby class and
  # add it to the ActivityWorker. We pass in the instance instead of the
  # class itself, so that we can locally access the instance variable set
  # by the activity method.
  activity = FlowDefaultResultActivityRuby.new

  # Create the activity worker to poll on the result tasklist
  worker = AWS::Flow::ActivityWorker.new(domain.client, domain, tasklist, activity) {{ use_forking: false }}

  # Keep polling till we get the result or timeout. A 0 or nil timeout
  # will let the loop run to completion.
  begin
    Timeout::timeout(timeout) do
      until activity.result.set?
        worker.run_once(false)
      end
    end
  rescue Timeout::Error => e
    activity.result.set
    return
  end

  # Get the result from the future
  result = activity.result.get
  if result.is_a?(Hash) && result[:failure] && result[:failure].is_a?(Exception)
    raise result[:failure]
  end

  result
end

.make_activity_class(klass) ⇒ Object

Used to convert a regular ruby class into a Ruby Flow Activity class, i.e. extends the AWS::Flow::Activities module. It converts all user defined instance methods into activities and assigns the following defaults to the ActivityType - version: “1.0”



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/aws/templates/default.rb', line 60

def self.make_activity_class(klass)
  return klass if klass.nil?

  name = klass.name.split(":").last

  proxy_name = name + "Proxy"
  # Create a proxy activity class that will define activities for all
  # instance methods of the class.
  new_klass = self::ActivityProxies.const_set(proxy_name.to_sym, Class.new(Object))

  # Extend the AWS::Flow::Activities module and create activities for all
  # instance methods
  new_klass.class_exec do
    extend AWS::Flow::Activities

    attr_reader :instance

    @@klass = klass

    def initialize
      @instance = @@klass.new
    end

    # Creates activities for all instance methods of the held klass
    @@klass.instance_methods(false).each do |method|
      activity(method) do
        {
          version: "1.0",
          prefix_name: name
        }
      end
    end

    # Redirect all method calls to the held instance
    def method_missing(method, *args, &block)
      @instance.send(method, *args, &block)
    end

  end
  new_klass
end

.register_default_domainObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers the default domain FlowDefault with the Simple Workflow Service



224
225
226
# File 'lib/aws/templates/starter.rb', line 224

def self.register_default_domain
  AWS::Flow::Utilities.register_domain(FlowConstants.defaults[:domain])
end

.register_default_result_activity(domain) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers the default result activity type FlowDefaultResultActivityRuby with the Simple Workflow Service



243
244
245
246
247
248
249
250
251
# File 'lib/aws/templates/starter.rb', line 243

def self.register_default_result_activity(domain)
  worker = AWS::Flow::ActivityWorker.new(
    domain.client,
    domain,
    nil,
    AWS::Flow::Templates.result_activity
  ) {{ use_forking: false }}
  worker.register
end

.register_default_workflow(domain) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers the default workflow type FlowDefaultWorkflowRuby with the Simple Workflow Service



231
232
233
234
235
236
237
238
# File 'lib/aws/templates/starter.rb', line 231

def self.register_default_workflow(domain)
  AWS::Flow::WorkflowWorker.new(
    domain.client,
    domain,
    nil,
    AWS::Flow::Templates.default_workflow
  ).register
end

.register_defaults(name = nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers the relevant defaults with the Simple Workflow Service



214
215
216
217
218
219
# File 'lib/aws/templates/starter.rb', line 214

def self.register_defaults(name=nil)
  domain = name.nil? ? register_default_domain : AWS::SimpleWorkflow.new.domains[name]

  register_default_workflow(domain)
  register_default_result_activity(domain)
end

.result_activityObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the default result activity class



134
135
136
# File 'lib/aws/templates/default.rb', line 134

def self.result_activity
  return AWS::Flow::Templates.const_get(FlowConstants.defaults[:result_activity_prefix])
end

.root(step, result_step = nil) ⇒ Object

Initializes a root template

Parameters:

  • step (TemplateBase)

    An AWS Flow Framework Template class that inherits TemplateBase. It contains the actual orchestration of workflow logic inside it.

  • result_step (ActivityTemplate) (defaults to: nil)

    An optional ActivityTemplate that can be used to report the result of the ‘step’



80
81
82
# File 'lib/aws/templates/base.rb', line 80

def self.root(step, result_step = nil)
  RootTemplate.new(step, result_step)
end

.set_result_activity(root) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sets the result activity with a unique tasklist name for the root template.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/aws/templates/starter.rb', line 151

def self.set_result_activity(root)
  # We want the result to be sent to a specific tasklist so that no other
  # worker gets the result of this workflow.
  result_tasklist = "result_tasklist: #{SecureRandom.uuid}"

  name = "#{FlowConstants.defaults[:result_activity_prefix]}."\
    "#{FlowConstants.defaults[:result_activity_method]}"

  # Set the result_step of the root template to the result activity and
  # override the tasklist and timeouts.
  root.result_step = activity(name, {
      task_list: result_tasklist,
      schedule_to_start_timeout: FlowConstants.defaults[:schedule_to_start_timeout],
      start_to_close_timeout: FlowConstants.defaults[:start_to_close_timeout]
    }
  )
  result_tasklist
end

.start(name_or_klass, input, opts = {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Starts an Activity or a Workflow Template execution using the default workflow class FlowDefaultWorkflowRuby

Usage -

AWS::Flow::start("<ActivityClassName>.<method_name>", <input_hash>,
<options_hash> )

Example -

1) Start an activity execution -

AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" })

2) Start an activity execution with overriden options -

AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" }, {
  exponential_retry: { maximum_attempts: 10 } }
)

Parameters:

  • name_or_klass (String or AWS::Flow::Templates::TemplateBase)

    The Activity or the Workflow Template that needs to be scheduled via the default workflow. This argument can either be a string that represents a fully qualified activity name - <ActivityClass>.<method_name> or it can be an instance of AWS::Flow::Templates::TemplateBase

  • input (Hash)

    Input hash for the workflow execution

  • opts (Hash) (defaults to: {})

    Additional options to configure the workflow or activity execution.

Options Hash (opts):

  • :wait (true, false)

    Optional This boolean flag can be set to true if the result of the task is required. Default value is false.

  • :wait_timeout (Integer)

    Optional This sets the timeout value for :wait. Default value is nil.

  • :exponential_retry (Hash)

    A hash of ExponentialRetryOptions. Default value is - { maximum_attempts: 3 }

  • *Optional* (String)

    :domain Default value is FlowDefault

  • *Optional* (Integer)

    :execution_start_to_close_timeout Default value is 3600 seconds (1 hour)

  • *Optional* (Integer)

    :retention_in_days Default value is 7 days

  • *Optional* (String)

    :workflow_id

  • *Optional* (Integer)

    :task_priority Default value is 0

  • *Optional* (String)

    :tag_list By default, the name of the activity task gets added to the workflow’s tag_list

  • *Optional* (Object)

    :data_converter Default value is YAMLDataConverter. To use the S3DataConverter, set the AWS_SWF_BUCKET_NAME environment variable name with a valid AWS S3 bucket name.

  • *Optional* (Object)

    A hash of ActivityOptions



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/aws/templates/starter.rb', line 74

def self.start(name_or_klass, input, opts = {})

  options = opts.dup

  if name_or_klass.is_a?(String)
    # Add activity name as a tag to the workflow execution
    (options[:tag_list] ||= []) << name_or_klass

    # If name_or_klass passed in is a string, we are assuming the user is
    # trying to start a single activity task. Wrap the activity information
    # in the activity template
    name_or_klass = AWS::Flow::Templates.activity(name_or_klass, options)

    # Keep only the required options in the hash
    keys = [
      :domain,
      :retention_in_days,
      :execution_start_to_close_timeout,
      :task_priority,
      :wait,
      :wait_timeout,
      :workflow_id,
      :data_converter,
      :tag_list
    ]
    options.select! { |x| keys.include?(x) }

  end

  # Wrap the template in a root template
  root = AWS::Flow::Templates.root(name_or_klass)

  # Get the default options and merge them with the options passed in. The
  # order of the two hashes 'defaults' and 'options' is important here.
  defaults = FlowConstants.defaults.select do |key|
    [
      :domain,
      :prefix_name,
      :execution_method,
      :version,
      :execution_start_to_close_timeout,
      :data_converter,
      :task_list
    ].include?(key)
  end
  options = defaults.merge(options)

  raise "input needs to be a Hash" unless input.is_a?(Hash)

  # Set the input for the default workflow
  workflow_input = {
    definition: root,
    args: input,
  }

  # Set the result_step for the root template if wait flag is
  # set.
  wait = options.delete(:wait)
  wait_timeout = options.delete(:wait_timeout)
  result_tasklist = set_result_activity(root) if wait

  # Call #start_workflow with the correct options to start the workflow
  # execution
  begin
    AWS::Flow::start_workflow(workflow_input, options)
  rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault => e
    register_defaults(options[:domain])
    AWS::Flow::start_workflow(workflow_input, options)
  end

  # Wait for result
  get_result(result_tasklist, options[:domain], wait_timeout) if wait

end

Instance Method Details

#activity(name, opts = {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Initializes an activity template

Parameters:

  • name (String)
  • options (Hash)


56
57
58
# File 'lib/aws/templates/activity.rb', line 56

def activity(name, opts = {})
  AWS::Flow::Templates.send(:activity, name, opts)
end

#root(step, result_step = nil) ⇒ Object

Initializes a root template

Parameters:

  • step (TemplateBase)

    An AWS Flow Framework Template class that inherits TemplateBase. It contains the actual orchestration of workflow logic inside it.

  • result_step (ActivityTemplate) (defaults to: nil)

    An optional ActivityTemplate that can be used to report the result of the ‘step’



69
70
71
# File 'lib/aws/templates/base.rb', line 69

def root(step, result_step = nil)
  AWS::Flow::Templates.send(:root, step, result_step)
end