Class: AWS::Flow::ActivityWorker

Inherits:
GenericWorker show all
Defined in:
lib/aws/decider/worker.rb

Overview

For implementing activity workers, you can use the ActivityWorker class to conveniently poll a task list for activity tasks.

You configure the activity worker with activity implementation objects. This worker class then polls for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided, and calls the activity method to process the task. Unlike the WorkflowWorker, which creates a new instance for every decision task, the ActivityWorker simply uses the object you provided.

Instance Method Summary collapse

Constructor Details

#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker

Creates a new ActivityWorker instance.

Parameters:



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/aws/decider/worker.rb', line 244

def initialize(service, domain, task_list, *args, &block)
  @activity_definition_map = {}
  @activity_type_options = []
  @options = Utilities::interpret_block_for_options(WorkerOptions, block)
  @logger = @options.logger if @options
  @logger ||= Utilities::LogFactory.make_logger(self, "debug")
  max_workers = @options.execution_workers if @options
  max_workers = 20 if (max_workers.nil? || max_workers.zero?)
  @executor = ForkingExecutor.new(:max_workers => max_workers, :logger => @logger)
  @shutdown_first_time_function = lambda do
    @executor.shutdown Float::INFINITY
    Kernel.exit
  end
  super(service, domain, task_list, *args)
end

Instance Method Details

#add_activities_implementation(class_or_instance) ⇒ Object

Adds an Activity implementation to this ActivityWorker.

Parameters:

  • class_or_instance (Activity)

    The AWS::Flow::Activity class or instance to add.



297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/aws/decider/worker.rb', line 297

def add_activities_implementation(class_or_instance)
  klass = (class_or_instance.class == Class) ? class_or_instance : class_or_instance.class
  instance = (class_or_instance.class == Class) ? class_or_instance.new : class_or_instance
  klass.activities.each do |activity_type|

    #TODO this should assign to an activityImplementation, so that we can call execute on it later
    @activity_definition_map[activity_type] = ActivityDefinition.new(instance, activity_type.name.split(".").last, nil, activity_type.options, activity_type.options.data_converter)
    options = activity_type.options
    option_hash = {
      :domain => @domain.name,
      :name => activity_type.name.to_s,
      :version => activity_type.version
    }
    option_hash.merge!(options.get_default_options)
    option_hash.merge!(:default_task_list => {:name => options.default_task_list}) if options.default_task_list
    @activity_type_options << option_hash
  end
end

#add_implementation(class_or_instance) ⇒ Object

Adds an Activity implementation to this ActivityWorker.

Parameters:

  • class_or_instance (Activity)

    The AWS::Flow::Activity class or instance to add.



265
266
267
# File 'lib/aws/decider/worker.rb', line 265

def add_implementation(class_or_instance)
  add_activities_implementation(class_or_instance)
end

#registerObject

Registers the activity type



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/aws/decider/worker.rb', line 271

def register
  @activity_type_options.each do |activity_type_options|
    begin
      @service.register_activity_type(activity_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      previous_registration = @service.describe_activity_type(:domain => @domain.name, :activity_type => {:name => activity_type_options[:name], :version => activity_type_options[:version]})
      default_options = activity_type_options.select {|key, val| key =~ /default/}
      previous_keys = previous_registration["configuration"].keys.map {|x| camel_case_to_snake_case(x).to_sym}

      previous_registration = Hash[previous_keys.zip(previous_registration["configuration"].values)]
      if previous_registration[:default_task_list]
        previous_registration[:default_task_list][:name] = previous_registration[:default_task_list].delete("name")
      end
      registration_difference =  default_options.sort.to_a - previous_registration.sort.to_a
      raise "There is a difference between the types you have registered previously and the types you are currently registering, but you haven't changed the version. These new changes will not be picked up. In particular, these options are different #{Hash[registration_difference]}" unless registration_difference.empty?
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end

#run_once(should_register = true, poller = nil) ⇒ Object

Starts the Activity that was added to the ActivityWorker and, optionally, sets the ActivityTaskPoller.

Parameters:

  • should_register (true, false) (defaults to: true)

    Set to ‘false` if the Activity should not register itself (it is already registered).

  • poller (ActivityTaskPoller) (defaults to: nil)

    The AWS::Flow::ActivityTaskPoller to use. If this is not set, a default ActivityTaskPoller will be created.



338
339
340
341
342
# File 'lib/aws/decider/worker.rb', line 338

def run_once(should_register = true, poller = nil)
  register if should_register
  poller = ActivityTaskPoller.new(@service, @domain, @task_list, @activity_definition_map, @executor, @options) if poller.nil?
  poller.poll_and_process_single_task(@options.use_forking)
end

#start(should_register = true) ⇒ Object

Starts the Activity that was added to the ActivityWorker

Parameters:

  • should_register (true, false) (defaults to: true)

    Set to false if the Activity should not register itself (it is already registered).



322
323
324
325
326
327
328
# File 'lib/aws/decider/worker.rb', line 322

def start(should_register = true)
  register if should_register
  poller = ActivityTaskPoller.new(@service, @domain, @task_list, @activity_definition_map, @executor, @options)
  loop do
    run_once(false, poller)
  end
end