Class: AWS::Flow::ActivityWorker

Inherits:
GenericWorker show all
Defined in:
lib/aws/decider/worker.rb

Overview

For implementing activity workers, you can use the ActivityWorker class to conveniently poll a task list for activity tasks.

You configure the activity worker with activity implementation objects. This worker class then polls for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided, and calls the activity method to process the task. Unlike the WorkflowWorker, which creates a new instance for every decision task, the ActivityWorker simply uses the object you provided.

Instance Method Summary collapse

Constructor Details

#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker

Creates a new ActivityWorker instance.

Parameters:



230
231
232
233
234
235
236
# File 'lib/aws/decider/worker.rb', line 230

def initialize(service, domain, task_list, *args, &block)
  @activity_definition_map = {}
  @executor = ForkingExecutor.new(:max_workers => 1)
  @activity_type_options = []
  @options = Utilities::interpret_block_for_options(WorkerOptions, block)
  super(service, domain, task_list, *args)
end

Instance Method Details

#add_activities_implementation(class_or_instance) ⇒ Object

Adds an Activity implementation to this ActivityWorker.

Parameters:

  • class_or_instance (Activity)

    The AWS::Flow::Activity class or instance to add.



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/aws/decider/worker.rb', line 275

def add_activities_implementation(class_or_instance)
  klass = (class_or_instance.class == Class) ? class_or_instance : class_or_instance.class
  instance = (class_or_instance.class == Class) ? class_or_instance.new : class_or_instance
  klass.activities.each do |activity_type|

    #TODO this should assign to an activityImplementation, so that we can call execute on it later
    @activity_definition_map[activity_type] = ActivityDefinition.new(instance, activity_type.name.split(".").last, nil, activity_type.options, activity_type.options.data_converter)
    options = activity_type.options
    option_hash = {
      :domain => @domain.name,
      :name => activity_type.name.to_s,
      :version => activity_type.version
    }
    option_hash.merge!(options.get_default_options)
    option_hash.merge!(:default_task_list => {:name => options.default_task_list}) if options.default_task_list
    @activity_type_options << option_hash
  end
end

#add_implementation(class_or_instance) ⇒ Object

Adds an Activity implementation to this ActivityWorker.

Parameters:

  • class_or_instance (Activity)

    The AWS::Flow::Activity class or instance to add.



243
244
245
# File 'lib/aws/decider/worker.rb', line 243

def add_implementation(class_or_instance)
  add_activities_implementation(class_or_instance)
end

#registerObject

Registers the activity type



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/aws/decider/worker.rb', line 249

def register
  @activity_type_options.each do |activity_type_options|
    begin
      @service.register_activity_type(activity_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      previous_registration = @service.describe_activity_type(:domain => @domain.name, :activity_type => {:name => activity_type_options[:name], :version => activity_type_options[:version]})
      default_options = activity_type_options.select {|key, val| key =~ /default/}
      previous_keys = previous_registration["configuration"].keys.map {|x| camel_case_to_snake_case(x).to_sym}

      previous_registration = Hash[previous_keys.zip(previous_registration["configuration"].values)]
      if previous_registration[:default_task_list]
        previous_registration[:default_task_list][:name] = previous_registration[:default_task_list].delete("name")
      end
      registration_difference =  default_options.sort.to_a - previous_registration.sort.to_a
      raise "There is a difference between the types you have registered previously and the types you are currently registering, but you haven't changed the version. These new changes will not be picked up. In particular, these options are different #{Hash[registration_difference]}" unless registration_difference.empty?
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end

#run_once(should_register = true, poller = nil) ⇒ Object

Starts the Activity that was added to the ActivityWorker and, optionally, sets the ActivityTaskPoller.

Parameters:

  • should_register (true, false) (defaults to: true)

    Set to ‘false` if the Activity should not register itself (it is already registered).

  • poller (ActivityTaskPoller) (defaults to: nil)

    The AWS::Flow::ActivityTaskPoller to use. If this is not set, a default ActivityTaskPoller will be created.



316
317
318
319
320
# File 'lib/aws/decider/worker.rb', line 316

def run_once(should_register = true, poller = nil)
  register if should_register
  poller = ActivityTaskPoller.new(@service, @domain, @task_list, @activity_definition_map, @options) if poller.nil?
  poller.poll_and_process_single_task(@options.use_forking)
end

#start(should_register = true) ⇒ Object

Starts the Activity that was added to the ActivityWorker

Parameters:

  • should_register (true, false) (defaults to: true)

    Set to false if the Activity should not register itself (it is already registered).



300
301
302
303
304
305
306
# File 'lib/aws/decider/worker.rb', line 300

def start(should_register = true)
  register if should_register
  poller = ActivityTaskPoller.new(@service, @domain, @task_list, @activity_definition_map, @options)
  loop do
    run_once(false, poller)
  end
end