Class: AWS::Flow::WorkflowWorker

Inherits:
GenericWorker show all
Defined in:
lib/aws/decider/worker.rb

Overview

This worker class is intended for use by the workflow implementation. It is configured with a task list and a workflow implementation. The worker class polls for decision tasks in the specified task list. When a decision task is received, it creates an instance of the workflow implementation and calls the @ execute() decorated method to process the task.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from GenericWorker

#camel_case_to_snake_case, #resolve_default_task_list

Constructor Details

#initialize(service, domain, task_list, *args, &block) ⇒ WorkflowWorker

Creates a new WorkflowWorker instance.

Parameters:

  • service

    The service used with this workflow worker.

  • domain (String)

    The Amazon SWF domain to operate on.

  • task_list (Array)

    The default task list to put all of the decision requests.

  • args

    The decisions to use.



130
131
132
133
134
135
136
137
138
139
140
# File 'lib/aws/decider/worker.rb', line 130

def initialize(service, domain, task_list, *args, &block)
  @workflow_definition_map = {}
  @workflow_type_options = []
  @options = Utilities::interpret_block_for_options(WorkerOptions, block)

  @logger = @options.logger if @options
  @logger ||= Utilities::LogFactory.make_logger(self)
  @options.logger ||= @logger if @options

  super(service, domain, task_list, *args)
end

Instance Attribute Details

#workflow_typeObject

The workflow type for this workflow worker.



114
115
116
# File 'lib/aws/decider/worker.rb', line 114

def workflow_type
  @workflow_type
end

Instance Method Details

#add_implementation(workflow_class) ⇒ Object



146
147
148
# File 'lib/aws/decider/worker.rb', line 146

def add_implementation(workflow_class)
  add_workflow_implementation(workflow_class)
end

#add_workflow_implementation(workflow_class) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by #add_implementation.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/aws/decider/worker.rb', line 152

def add_workflow_implementation(workflow_class)
  workflow_class.workflows.delete_if do |workflow_type|
    workflow_type.version.nil? || workflow_type.name.nil?
  end
  workflow_class.workflows.each do |workflow_type|
    options = workflow_type.options
    execution_method = options.execution_method
    version = workflow_type.version
    registration_options = options.get_registration_options
    implementation_options = nil
    get_state_method = workflow_class.get_state_method
    signals = workflow_class.signals

    @workflow_definition_map[workflow_type] = WorkflowDefinitionFactory.new(
      workflow_class,
      workflow_type,
      registration_options,
      options,
      execution_method,
      signals,
      get_state_method
    )
    # TODO should probably do something like
    # GenericWorkflowWorker#registerWorkflowTypes
    workflow_hash = options.get_options(
      [
        :default_task_start_to_close_timeout,
        :default_execution_start_to_close_timeout,
        :default_child_policy
      ], {
        :domain => @domain.name,
        :name => workflow_type.name,
        :version => version
      }
    )

    if options.default_task_list
      workflow_hash.merge!(
        :default_task_list => {:name => resolve_default_task_list(options.default_task_list)}
      )
    end
    @workflow_type_options << workflow_hash
  end
end

#registerObject

Registers this workflow with Amazon SWF.



199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/aws/decider/worker.rb', line 199

def register
  @workflow_type_options.delete_if {|workflow_type_options| workflow_type_options[:version].nil?}
  @workflow_type_options.each do |workflow_type_options|
    begin
      @service.register_workflow_type(workflow_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      @logger.warn "#{e.class} while trying to register workflow #{e.message} with options #{workflow_type_options}"
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end

#run_once(should_register = false, poller = nil) ⇒ Object

Starts the workflow and runs it once, with an optional AWS::Flow::WorkflowTaskPoller.

Parameters:

  • poller (defaults to: nil)

    An optional AWS::Flow::WorkflowTaskPoller to use.

  • should_register (true, false) (defaults to: false)

    Indicates whether the workflow needs to be registered with Amazon SWF first. If #register was already called for this workflow worker, specify ‘false`.



251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/aws/decider/worker.rb', line 251

def run_once(should_register = false, poller = nil)
  register if should_register

  poller = WorkflowTaskPoller.new(
    @service,
    @domain,
    DecisionTaskHandler.new(@workflow_definition_map, @options),
    @task_list,
    @options
  ) if poller.nil?

  Kernel.exit if @shutting_down
  poller.poll_and_process_single_task
end

#set_workflow_implementation_types(workflow_implementation_types) ⇒ Object



142
143
144
# File 'lib/aws/decider/worker.rb', line 142

def set_workflow_implementation_types(workflow_implementation_types)
  workflow_implementation_types.each {|type| add_workflow_implementation(type)}
end

#start(should_register = true) ⇒ Object

Starts the workflow with a AWS::Flow::WorkflowTaskPoller.

Parameters:

  • should_register (true, false) (defaults to: true)

    Indicates whether the workflow needs to be registered with Amazon SWF first. If #register was already called for this workflow worker, specify ‘false`.



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/aws/decider/worker.rb', line 220

def start(should_register = true)
  # TODO check to make sure that the correct properties are set
  # TODO Register the domain if not already registered
  # TODO register types to poll
  # TODO Set up throttler
  # TODO Set up a timeout on the throttler correctly,
  # TODO Make this a generic poller, go to the right kind correctly

  poller = WorkflowTaskPoller.new(
    @service,
    @domain,
    DecisionTaskHandler.new(@workflow_definition_map, @options),
    @task_list,
    @options
  )

  register if should_register
  @logger.debug "Starting an infinite loop to poll and process workflow tasks."
  loop do
    run_once(false, poller)
  end
end