Class: AWS::Flow::ActivityWorker

Inherits:
GenericWorker show all
Defined in:
lib/aws/decider/worker.rb

Overview

Used to implement an activity worker. You can use the ‘ActivityWorker` class to conveniently poll a task list for activity tasks.

You configure the activity worker with activity implementation objects. This worker class then polls for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided, and calls the activity method to process the task. Unlike the WorkflowWorker, which creates a new instance for every decision task, the ‘ActivityWorker` simply uses the object you provided.

Instance Method Summary collapse

Methods inherited from GenericWorker

#camel_case_to_snake_case, #resolve_default_task_list

Constructor Details

#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker

Creates a new ‘ActivityWorker` instance.

Parameters:



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/aws/decider/worker.rb', line 295

def initialize(service, domain, task_list, *args, &block)
  @activity_definition_map = {}
  @activity_type_options = []
  @options = Utilities::interpret_block_for_options(WorkerOptions, block)

  @logger = @options.logger if @options
  @logger ||= Utilities::LogFactory.make_logger(self)
  @options.logger ||= @logger if @options

  max_workers = @options.execution_workers if @options
  max_workers = 20 if (max_workers.nil? || max_workers.zero?)
  @executor = ForkingExecutor.new(
    :max_workers => max_workers,
    :logger => @logger
  )
  @shutdown_first_time_function = lambda do
    @executor.shutdown Float::INFINITY
    Kernel.exit
  end
  super(service, domain, task_list, *args)
end

Instance Method Details

#add_activities_implementation(class_or_instance) ⇒ Object

Adds an activity implementation to this ‘ActivityWorker`.

Parameters:



364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# File 'lib/aws/decider/worker.rb', line 364

def add_activities_implementation(class_or_instance)
  klass = (class_or_instance.class == Class) ? class_or_instance : class_or_instance.class
  instance = (class_or_instance.class == Class) ? class_or_instance.new : class_or_instance
  klass.activities.each do |activity_type|

    # TODO this should assign to an activityImplementation, so that we can
    # call execute on it later
    @activity_definition_map[activity_type] = ActivityDefinition.new(
      instance,
      activity_type.name.split(".").last,
      nil,
      activity_type.options,
      activity_type.options.data_converter
    )
    options = activity_type.options
    option_hash = {
      :domain => @domain.name,
      :name => activity_type.name.to_s,
      :version => activity_type.version
    }

    option_hash.merge!(options.get_registration_options)

    if options.default_task_list
      option_hash.merge!(
        :default_task_list => {:name => resolve_default_task_list(options.default_task_list)}
      )
    end

    @activity_type_options << option_hash
  end
end

#add_implementation(class_or_instance) ⇒ Object

Adds an activity implementation to this ‘ActivityWorker`.

Parameters:



322
323
324
# File 'lib/aws/decider/worker.rb', line 322

def add_implementation(class_or_instance)
  add_activities_implementation(class_or_instance)
end

#registerObject

Registers the activity type.



328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/aws/decider/worker.rb', line 328

def register
  @activity_type_options.each do |activity_type_options|
    begin
      @service.register_activity_type(activity_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      @logger.warn "#{e.class} while trying to register activity #{e.message} with options #{activity_type_options}"
      previous_registration = @service.describe_activity_type(
        :domain => @domain.name,
        :activity_type => {
          :name => activity_type_options[:name],
          :version => activity_type_options[:version]
        }
      )
      default_options = activity_type_options.select { |key, val| key =~ /default/}
      previous_keys = previous_registration["configuration"].keys.map {|x| camel_case_to_snake_case(x).to_sym}

      previous_registration = Hash[previous_keys.zip(previous_registration["configuration"].values)]
      if previous_registration[:default_task_list]
        previous_registration[:default_task_list][:name] = previous_registration[:default_task_list].delete("name")
      end
      registration_difference =  default_options.sort.to_a - previous_registration.sort.to_a

      unless registration_difference.empty?
        raise "There is a difference between the types you have registered previously and the types you are currently registering, but you haven't changed the version. These new changes will not be picked up. In particular, these options are different #{Hash[registration_difference]}"
      end
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end

#run_once(should_register = true, poller = nil) ⇒ Object

Starts the activity that was added to the ‘ActivityWorker` and, optionally, sets the AWS::Flow::ActivityTaskPoller.

Parameters:



433
434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/aws/decider/worker.rb', line 433

def run_once(should_register = true, poller = nil)
  register if should_register
  poller = ActivityTaskPoller.new(
    @service,
    @domain,
    @task_list,
    @activity_definition_map,
    @executor,
    @options
  ) if poller.nil?

  Kernel.exit if @shutting_down
  poller.poll_and_process_single_task(@options.use_forking)
end

#start(should_register = true) ⇒ Object

Starts the activity that was added to the ‘ActivityWorker`.

Parameters:

  • should_register (true, false) (defaults to: true)

    Set to ‘false` if the activity should not register itself (it is already registered).



404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
# File 'lib/aws/decider/worker.rb', line 404

def start(should_register = true)

  register if should_register
  poller = ActivityTaskPoller.new(
    @service,
    @domain,
    @task_list,
    @activity_definition_map,
    @executor,
    @options
  )

  @logger.debug "Starting an infinite loop to poll and process activity tasks."
  loop do
    run_once(false, poller)
  end
end