Class: AWS::Flow::WorkflowWorker

Inherits:
GenericWorker show all
Defined in:
lib/aws/decider/worker.rb

Overview

This worker class is intended for use by the workflow implementation. It is configured with a task list and a workflow implementation. The worker class polls for decision tasks in the specified task list. When a decision task is received, it creates an instance of the workflow implementation and calls the @ execute() decorated method to process the task.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from GenericWorker

#camel_case_to_snake_case, #resolve_default_task_list

Constructor Details

#initialize(service, domain, task_list, *args, &block) ⇒ WorkflowWorker

Creates a new WorkflowWorker instance.

Parameters:

  • service

    The service used with this workflow worker.

  • domain (String)

    The Amazon SWF domain to operate on.

  • task_list (Array)

    The default task list to put all of the decision requests.

  • args

    The decisions to use.



130
131
132
133
134
135
136
137
138
139
140
# File 'lib/aws/decider/worker.rb', line 130

def initialize(service, domain, task_list, *args, &block)
  @workflow_definition_map = {}
  @workflow_type_options = []
  @options = Utilities::interpret_block_for_options(WorkerOptions, block)

  @logger = @options.logger if @options
  @logger ||= Utilities::LogFactory.make_logger(self)
  @options.logger ||= @logger if @options

  super(service, domain, task_list, *args)
end

Instance Attribute Details

#workflow_typeObject

The workflow type for this workflow worker.



114
115
116
# File 'lib/aws/decider/worker.rb', line 114

def workflow_type
  @workflow_type
end

Instance Method Details

#add_implementation(workflow_class) ⇒ Object



146
147
148
# File 'lib/aws/decider/worker.rb', line 146

def add_implementation(workflow_class)
  add_workflow_implementation(workflow_class)
end

#add_workflow_implementation(workflow_class) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by #add_implementation.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/aws/decider/worker.rb', line 152

def add_workflow_implementation(workflow_class)
  workflow_class.workflows.delete_if do |workflow_type|
    workflow_type.version.nil? || workflow_type.name.nil?
  end

  @workflow_definition_map.merge!(
    WorkflowDefinitionFactory.generate_definition_map(workflow_class)
  )

  workflow_class.workflows.each do |workflow_type|
    # TODO should probably do something like
    # GenericWorkflowWorker#registerWorkflowTypes
    options = workflow_type.options
    workflow_hash = options.get_options(
      [
        :default_task_start_to_close_timeout,
        :default_execution_start_to_close_timeout,
        :default_child_policy,
        :default_task_priority
      ], {
        :domain => @domain.name,
        :name => workflow_type.name,
        :version => workflow_type.version
      }
    )

    if options.default_task_list
      workflow_hash.merge!(
        :default_task_list => {:name => resolve_default_task_list(options.default_task_list)}
      )
    end
    @workflow_type_options << workflow_hash
  end
end

#registerObject

Registers this workflow with Amazon SWF.



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/aws/decider/worker.rb', line 189

def register
  @workflow_type_options.delete_if {|workflow_type_options| workflow_type_options[:version].nil?}
  @workflow_type_options.each do |workflow_type_options|
    begin
      @service.register_workflow_type(workflow_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      @logger.warn "#{e.class} while trying to register workflow #{e.message} with options #{workflow_type_options}"
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end

#run_once(should_register = false, poller = nil) ⇒ Object

Starts the workflow and runs it once, with an optional AWS::Flow::WorkflowTaskPoller.

Parameters:

  • poller (defaults to: nil)

    An optional AWS::Flow::WorkflowTaskPoller to use.

  • should_register (true, false) (defaults to: false)

    Indicates whether the workflow needs to be registered with Amazon SWF first. If #register was already called for this workflow worker, specify ‘false`.



241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/aws/decider/worker.rb', line 241

def run_once(should_register = false, poller = nil)
  register if should_register

  poller = WorkflowTaskPoller.new(
    @service,
    @domain,
    DecisionTaskHandler.new(@workflow_definition_map, @options),
    @task_list,
    @options
  ) if poller.nil?

  Kernel.exit if @shutting_down
  poller.poll_and_process_single_task
end

#set_workflow_implementation_types(workflow_implementation_types) ⇒ Object



142
143
144
# File 'lib/aws/decider/worker.rb', line 142

def set_workflow_implementation_types(workflow_implementation_types)
  workflow_implementation_types.each {|type| add_workflow_implementation(type)}
end

#start(should_register = true) ⇒ Object

Starts the workflow with a AWS::Flow::WorkflowTaskPoller.

Parameters:

  • should_register (true, false) (defaults to: true)

    Indicates whether the workflow needs to be registered with Amazon SWF first. If #register was already called for this workflow worker, specify ‘false`.



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/aws/decider/worker.rb', line 210

def start(should_register = true)
  # TODO check to make sure that the correct properties are set
  # TODO Register the domain if not already registered
  # TODO register types to poll
  # TODO Set up throttler
  # TODO Set up a timeout on the throttler correctly,
  # TODO Make this a generic poller, go to the right kind correctly

  poller = WorkflowTaskPoller.new(
    @service,
    @domain,
    DecisionTaskHandler.new(@workflow_definition_map, @options),
    @task_list,
    @options
  )

  register if should_register
  @logger.debug "Starting an infinite loop to poll and process workflow tasks."
  loop do
    run_once(false, poller)
  end
end