Class: AWS::Flow::ActivityWorker

Inherits:
GenericWorker show all
Defined in:
lib/aws/decider/worker.rb

Overview

Used to implement an activity worker. You can use the ‘ActivityWorker` class to conveniently poll a task list for activity tasks.

You configure the activity worker with activity implementation objects. This worker class then polls for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided, and calls the activity method to process the task. Unlike the WorkflowWorker, which creates a new instance for every decision task, the ‘ActivityWorker` simply uses the object you provided.

Instance Method Summary collapse

Methods inherited from GenericWorker

#camel_case_to_snake_case, #resolve_default_task_list

Constructor Details

#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker

Creates a new ‘ActivityWorker` instance.

Parameters:



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/aws/decider/worker.rb', line 285

def initialize(service, domain, task_list, *args, &block)
  @activity_definition_map = {}
  @activity_type_options = []
  @options = Utilities::interpret_block_for_options(WorkerOptions, block)

  @logger = @options.logger if @options
  @logger ||= Utilities::LogFactory.make_logger(self)
  @options.logger ||= @logger if @options

  max_workers = @options.execution_workers if @options
  max_workers = 20 if (max_workers.nil? || max_workers.zero?)
  @executor = ForkingExecutor.new(
    :max_workers => max_workers,
    :logger => @logger
  )
  @shutdown_first_time_function = lambda do
    @executor.shutdown Float::INFINITY
    Kernel.exit
  end
  super(service, domain, task_list, *args)
end

Instance Method Details

#add_activities_implementation(class_or_instance) ⇒ Object

Adds an activity implementation to this ‘ActivityWorker`.

Parameters:



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# File 'lib/aws/decider/worker.rb', line 354

def add_activities_implementation(class_or_instance)
  klass = (class_or_instance.class == Class) ? class_or_instance : class_or_instance.class
  instance = (class_or_instance.class == Class) ? class_or_instance.new : class_or_instance
  klass.activities.each do |activity_type|

    # TODO this should assign to an activityImplementation, so that we can
    # call execute on it later
    @activity_definition_map[activity_type] = ActivityDefinition.new(
      instance,
      activity_type.name.split(".").last,
      nil,
      activity_type.options,
      activity_type.options.data_converter
    )
    options = activity_type.options
    option_hash = {
      :domain => @domain.name,
      :name => activity_type.name.to_s,
      :version => activity_type.version
    }

    option_hash.merge!(options.get_registration_options)

    if options.default_task_list
      option_hash.merge!(
        :default_task_list => {:name => resolve_default_task_list(options.default_task_list)}
      )
    end

    @activity_type_options << option_hash
  end
end

#add_implementation(class_or_instance) ⇒ Object

Adds an activity implementation to this ‘ActivityWorker`.

Parameters:



312
313
314
# File 'lib/aws/decider/worker.rb', line 312

def add_implementation(class_or_instance)
  add_activities_implementation(class_or_instance)
end

#registerObject

Registers the activity type.



318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/aws/decider/worker.rb', line 318

def register
  @activity_type_options.each do |activity_type_options|
    begin
      @service.register_activity_type(activity_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      @logger.warn "#{e.class} while trying to register activity #{e.message} with options #{activity_type_options}"
      previous_registration = @service.describe_activity_type(
        :domain => @domain.name,
        :activity_type => {
          :name => activity_type_options[:name],
          :version => activity_type_options[:version]
        }
      )
      default_options = activity_type_options.select { |key, val| key =~ /default/}
      previous_keys = previous_registration["configuration"].keys.map {|x| camel_case_to_snake_case(x).to_sym}

      previous_registration = Hash[previous_keys.zip(previous_registration["configuration"].values)]
      if previous_registration[:default_task_list]
        previous_registration[:default_task_list][:name] = previous_registration[:default_task_list].delete("name")
      end
      registration_difference =  default_options.sort.to_a - previous_registration.sort.to_a

      unless registration_difference.empty?
        raise "There is a difference between the types you have registered previously and the types you are currently registering, but you haven't changed the version. These new changes will not be picked up. In particular, these options are different #{Hash[registration_difference]}"
      end
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end

#run_once(should_register = true, poller = nil) ⇒ Object

Starts the activity that was added to the ‘ActivityWorker` and, optionally, sets the AWS::Flow::ActivityTaskPoller.

Parameters:



423
424
425
426
427
428
429
430
431
432
433
434
435
436
# File 'lib/aws/decider/worker.rb', line 423

def run_once(should_register = true, poller = nil)
  register if should_register
  poller = ActivityTaskPoller.new(
    @service,
    @domain,
    @task_list,
    @activity_definition_map,
    @executor,
    @options
  ) if poller.nil?

  Kernel.exit if @shutting_down
  poller.poll_and_process_single_task(@options.use_forking)
end

#start(should_register = true) ⇒ Object

Starts the activity that was added to the ‘ActivityWorker`.

Parameters:

  • should_register (true, false) (defaults to: true)

    Set to ‘false` if the activity should not register itself (it is already registered).



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/aws/decider/worker.rb', line 394

def start(should_register = true)

  register if should_register
  poller = ActivityTaskPoller.new(
    @service,
    @domain,
    @task_list,
    @activity_definition_map,
    @executor,
    @options
  )

  @logger.debug "Starting an infinite loop to poll and process activity tasks."
  loop do
    run_once(false, poller)
  end
end