Class: AWS::Flow::WorkflowWorker

Inherits:
GenericWorker show all
Defined in:
lib/aws/decider/worker.rb

Overview

This worker class is intended for use by the workflow implementation. It is configured with a task list and a workflow implementation. The worker class polls for decision tasks in the specified task list. When a decision task is received, it creates an instance of the workflow implementation and calls the @ execute() decorated method to process the task.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(service, domain, task_list, *args) ⇒ WorkflowWorker

Creates a new WorkflowWorker instance.

Parameters:

  • service

    The service used with this workflow worker.

  • domain (String)

    The SWF domain to operate on.

  • task_list (Array)

    The default task list to put all of the decision requests.

  • args

    The decisions to use.



124
125
126
127
128
129
130
# File 'lib/aws/decider/worker.rb', line 124

def initialize(service, domain, task_list, *args)
  @workflow_definition_map = {}
  @workflow_type_options = []
  super(service, domain, task_list, *args)


end

Instance Attribute Details

#workflow_typeObject

The workflow type for this workflow worker.



108
109
110
# File 'lib/aws/decider/worker.rb', line 108

def workflow_type
  @workflow_type
end

Instance Method Details

#add_implementation(workflow_class) ⇒ Object



136
137
138
# File 'lib/aws/decider/worker.rb', line 136

def add_implementation(workflow_class)
  add_workflow_implementation(workflow_class)
end

#registerObject

Registers this workflow with Amazon SWF.



170
171
172
173
174
175
176
177
178
179
180
# File 'lib/aws/decider/worker.rb', line 170

def register
  @workflow_type_options.delete_if {|workflow_type_options| workflow_type_options[:version].nil?}
  @workflow_type_options.each do |workflow_type_options|
    begin
      @service.register_workflow_type(workflow_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end

#run_once(should_register = false, poller = nil) ⇒ Object

Starts the workflow and runs it once, with an optional AWS::Flow::WorkflowTaskPoller.

Parameters:

  • poller (defaults to: nil)

    An optional AWS::Flow::WorkflowTaskPoller to use.

  • should_register (true, false) (defaults to: false)

    Indicates whether or not the workflow needs to be registered with SWF first. If #register was already called for this workflow worker, specify ‘false`.



210
211
212
213
214
215
# File 'lib/aws/decider/worker.rb', line 210

def run_once(should_register = false, poller = nil)
  register if should_register
  poller = WorkflowTaskPoller.new(@service, @domain, DecisionTaskHandler.new(@workflow_definition_map, @options), @task_list, @options) if poller.nil?
  Kernel.exit if @shutting_down
  poller.poll_and_process_single_task
end

#set_workflow_implementation_types(workflow_implementation_types) ⇒ Object



132
133
134
# File 'lib/aws/decider/worker.rb', line 132

def set_workflow_implementation_types(workflow_implementation_types)
  workflow_implementation_types.each {|type| add_workflow_implementation_type(type)}
end

#start(should_register = true) ⇒ Object

Starts the workflow with a AWS::Flow::WorkflowTaskPoller.

Parameters:

  • should_register (true, false) (defaults to: true)

    Indicates whether or not the workflow needs to be registered with SWF first. If #register was already called for this workflow worker, specify ‘false`.



189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/aws/decider/worker.rb', line 189

def start(should_register = true)
  # TODO check to make sure that the correct properties are set
  # TODO Register the domain if not already registered
  # TODO register types to poll
  # TODO Set up throttler
  # TODO Set up a timeout on the throttler correctly,
  # TODO Make this a generic poller, go to the right kind correctly
  poller = WorkflowTaskPoller.new(@service, @domain, DecisionTaskHandler.new(@workflow_definition_map, @options), @task_list, @options)
  register if should_register
  loop do
    run_once(false, poller)
  end
end