Class: AWS::Flow::WorkflowWorker

Inherits:
GenericWorker show all
Defined in:
lib/aws/decider/worker.rb

Overview

This worker class is intended for use by the workflow implementation. It is configured with a task list and a workflow implementation. The worker class polls for decision tasks in the specified task list. When a decision task is received, it creates an instance of the workflow implementation and calls the @ execute() decorated method to process the task.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(service, domain, task_list, *args) ⇒ WorkflowWorker

Creates a new WorkflowWorker instance.

Parameters:

  • service

    The service used with this workflow worker.

  • domain (String)

    The SWF domain to operate on.

  • task_list (Array)

    The default task list to put all of the decision requests.

  • args

    The decisions to use.



112
113
114
115
116
117
# File 'lib/aws/decider/worker.rb', line 112

def initialize(service, domain, task_list, *args)
  @workflow_definition_map = {}
  @executor = ForkingExecutor.new(:max_workers => 2, :log_level => 5)
  @workflow_type_options = []
  super(service, domain, task_list, *args)
end

Instance Attribute Details

#workflow_typeObject

The workflow type for this workflow worker.



96
97
98
# File 'lib/aws/decider/worker.rb', line 96

def workflow_type
  @workflow_type
end

Instance Method Details

#add_implementation(workflow_class) ⇒ Object



123
124
125
# File 'lib/aws/decider/worker.rb', line 123

def add_implementation(workflow_class)
  add_workflow_implementation(workflow_class)
end

#registerObject

Registers this workflow with Amazon SWF.



157
158
159
160
161
162
163
164
165
166
167
# File 'lib/aws/decider/worker.rb', line 157

def register
  @workflow_type_options.delete_if {|workflow_type_options| workflow_type_options[:version].nil?}
  @workflow_type_options.each do |workflow_type_options|
    begin
      @service.register_workflow_type(workflow_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end

#run_once(should_register = false, poller = nil) ⇒ Object

Starts the workflow and runs it once, with an optional AWS::Flow::WorkflowTaskPoller.

Parameters:

  • poller (defaults to: nil)

    An optional AWS::Flow::WorkflowTaskPoller to use.

  • should_register (true, false) (defaults to: false)

    Indicates whether or not the workflow needs to be registered with SWF first. If #register was already called for this workflow worker, specify ‘false`.



197
198
199
200
201
# File 'lib/aws/decider/worker.rb', line 197

def run_once(should_register = false, poller = nil)
  register if should_register
  poller = WorkflowTaskPoller.new(@service, @domain, DecisionTaskHandler.new(@workflow_definition_map, @options), @task_list, @options) if poller.nil?
  poller.poll_and_process_single_task
end

#set_workflow_implementation_types(workflow_implementation_types) ⇒ Object



119
120
121
# File 'lib/aws/decider/worker.rb', line 119

def set_workflow_implementation_types(workflow_implementation_types)
  workflow_implementation_types.each {|type| add_workflow_implementation_type(type)}
end

#start(should_register = true) ⇒ Object

Starts the workflow with a AWS::Flow::WorkflowTaskPoller.

Parameters:

  • should_register (true, false) (defaults to: true)

    Indicates whether or not the workflow needs to be registered with SWF first. If #register was already called for this workflow worker, specify ‘false`.



176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/aws/decider/worker.rb', line 176

def start(should_register = true)
  # TODO check to make sure that the correct properties are set
  # TODO Register the domain if not already registered
  # TODO register types to poll
  # TODO Set up throttler
  # TODO Set up a timeout on the throttler correctly,
  # TODO Make this a generic poller, go to the right kind correctly
  poller = WorkflowTaskPoller.new(@service, @domain, DecisionTaskHandler.new(@workflow_definition_map, @options), @task_list, @options)
  register if should_register
  loop do
    run_once(false, poller)
  end
end