Class: AWS::Flow::WorkflowWorker

Inherits:
GenericWorker show all
Defined in:
lib/aws/decider/worker.rb

Overview

This worker class is intended for use by the workflow implementation. It is configured with a task list and a workflow implementation. The worker class polls for decision tasks in the specified task list. When a decision task is received, it creates an instance of the workflow implementation and calls the @ execute() decorated method to process the task.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from GenericWorker

#camel_case_to_snake_case

Constructor Details

#initialize(service, domain, task_list, *args, &block) ⇒ WorkflowWorker

Creates a new WorkflowWorker instance.

Parameters:

  • service

    The service used with this workflow worker.

  • domain (String)

    The Amazon SWF domain to operate on.

  • task_list (Array)

    The default task list to put all of the decision requests.

  • args

    The decisions to use.



127
128
129
130
131
132
133
134
135
136
137
# File 'lib/aws/decider/worker.rb', line 127

def initialize(service, domain, task_list, *args, &block)
  @workflow_definition_map = {}
  @workflow_type_options = []
  @options = Utilities::interpret_block_for_options(WorkerOptions, block)

  @logger = @options.logger if @options
  @logger ||= Utilities::LogFactory.make_logger(self)
  @options.logger ||= @logger if @options

  super(service, domain, task_list, *args)
end

Instance Attribute Details

#workflow_typeObject

The workflow type for this workflow worker.



111
112
113
# File 'lib/aws/decider/worker.rb', line 111

def workflow_type
  @workflow_type
end

Instance Method Details

#add_implementation(workflow_class) ⇒ Object



143
144
145
# File 'lib/aws/decider/worker.rb', line 143

def add_implementation(workflow_class)
  add_workflow_implementation(workflow_class)
end

#add_workflow_implementation(workflow_class) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by #add_implementation.



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/aws/decider/worker.rb', line 149

def add_workflow_implementation(workflow_class)
  workflow_class.workflows.delete_if do |workflow_type|
    workflow_type.version.nil? || workflow_type.name.nil?
  end
  workflow_class.workflows.each do |workflow_type|
    options = workflow_type.options
    execution_method = options.execution_method
    version = workflow_type.version
    registration_options = nil
    implementation_options = nil
    get_state_method = workflow_class.get_state_method
    signals = workflow_class.signals

    @workflow_definition_map[workflow_type] = WorkflowDefinitionFactory.new(
      workflow_class,
      workflow_type,
      registration_options,
      options,
      execution_method,
      signals,
      get_state_method
    )
    # TODO should probably do something like
    # GenericWorkflowWorker#registerWorkflowTypes
    workflow_hash = options.get_options(
      [
        :default_task_start_to_close_timeout,
        :default_execution_start_to_close_timeout,
        :default_child_policy
      ], {
        :domain => @domain.name,
        :name => workflow_type.name,
        :version => version
      }
    )
    if options.default_task_list
      workflow_hash.merge!(
        {:default_task_list => {:name => options.default_task_list} }
      )
    end
    @workflow_type_options << workflow_hash
  end
end

#registerObject

Registers this workflow with Amazon SWF.



195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/aws/decider/worker.rb', line 195

def register
  @workflow_type_options.delete_if {|workflow_type_options| workflow_type_options[:version].nil?}
  @workflow_type_options.each do |workflow_type_options|
    begin
      @service.register_workflow_type(workflow_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      @logger.warn "#{e.class} while trying to register workflow #{e.message} with options #{workflow_type_options}"
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end

#run_once(should_register = false, poller = nil) ⇒ Object

Starts the workflow and runs it once, with an optional AWS::Flow::WorkflowTaskPoller.

Parameters:

  • poller (defaults to: nil)

    An optional AWS::Flow::WorkflowTaskPoller to use.

  • should_register (true, false) (defaults to: false)

    Indicates whether the workflow needs to be registered with Amazon SWF first. If #register was already called for this workflow worker, specify ‘false`.



247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/aws/decider/worker.rb', line 247

def run_once(should_register = false, poller = nil)
  register if should_register

  poller = WorkflowTaskPoller.new(
    @service,
    @domain,
    DecisionTaskHandler.new(@workflow_definition_map, @options),
    @task_list,
    @options
  ) if poller.nil?

  Kernel.exit if @shutting_down
  poller.poll_and_process_single_task
end

#set_workflow_implementation_types(workflow_implementation_types) ⇒ Object



139
140
141
# File 'lib/aws/decider/worker.rb', line 139

def set_workflow_implementation_types(workflow_implementation_types)
  workflow_implementation_types.each {|type| add_workflow_implementation_type(type)}
end

#start(should_register = true) ⇒ Object

Starts the workflow with a AWS::Flow::WorkflowTaskPoller.

Parameters:

  • should_register (true, false) (defaults to: true)

    Indicates whether the workflow needs to be registered with Amazon SWF first. If #register was already called for this workflow worker, specify ‘false`.



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/aws/decider/worker.rb', line 216

def start(should_register = true)
  # TODO check to make sure that the correct properties are set
  # TODO Register the domain if not already registered
  # TODO register types to poll
  # TODO Set up throttler
  # TODO Set up a timeout on the throttler correctly,
  # TODO Make this a generic poller, go to the right kind correctly

  poller = WorkflowTaskPoller.new(
    @service,
    @domain,
    DecisionTaskHandler.new(@workflow_definition_map, @options),
    @task_list,
    @options
  )

  register if should_register
  @logger.debug "Starting an infinite loop to poll and process workflow tasks."
  loop do
    run_once(false, poller)
  end
end