Class: AWS::Flow::ActivityWorker

Inherits:
GenericWorker show all
Defined in:
lib/aws/decider/worker.rb

Overview

Used to implement an activity worker. You can use the ‘ActivityWorker` class to conveniently poll a task list for activity tasks.

You configure the activity worker with activity implementation objects. This worker class then polls for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided, and calls the activity method to process the task. Unlike the WorkflowWorker, which creates a new instance for every decision task, the ‘ActivityWorker` simply uses the object you provided.

Instance Method Summary collapse

Methods inherited from GenericWorker

#camel_case_to_snake_case, #resolve_default_task_list

Constructor Details

#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker

Creates a new ‘ActivityWorker` instance.

Parameters:



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/aws/decider/worker.rb', line 285

def initialize(service, domain, task_list, *args, &block)
  @activity_definition_map = {}
  @activity_type_options = []
  @options = Utilities::interpret_block_for_options(WorkerOptions, block)

  if @options
    @logger = @options.logger || Utilities::LogFactory.make_logger(self)
    @options.logger ||= @logger
    # Set the number of execution workers to 0 if it's not already set and
    # if the platform is Windows
    @options.execution_workers ||= 0 if AWS::Flow.on_windows?
    max_workers = @options.execution_workers
    # If max_workers is set to 0, then turn forking off
    @options.use_forking = false if (max_workers && max_workers.zero?)
  end
  max_workers = 20 if (max_workers.nil?)

  @executor = ForkingExecutor.new(
    :max_workers => max_workers,
    :logger => @logger
  )

  @shutdown_first_time_function = lambda do
    @executor.shutdown Float::INFINITY
    Kernel.exit
  end
  super(service, domain, task_list, *args)
end

Instance Method Details

#add_activities_implementation(class_or_instance) ⇒ Object

Adds an activity implementation to this ‘ActivityWorker`.

Parameters:



361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
# File 'lib/aws/decider/worker.rb', line 361

def add_activities_implementation(class_or_instance)
  klass = (class_or_instance.class == Class) ? class_or_instance : class_or_instance.class
  instance = (class_or_instance.class == Class) ? class_or_instance.new : class_or_instance
  klass.activities.each do |activity_type|

    # TODO this should assign to an activityImplementation, so that we can
    # call execute on it later
    @activity_definition_map[activity_type] = ActivityDefinition.new(
      instance,
      activity_type.name.split(".").last,
      nil,
      activity_type.options,
      activity_type.options.data_converter
    )
    options = activity_type.options
    option_hash = {
      :domain => @domain.name,
      :name => activity_type.name.to_s,
      :version => activity_type.version
    }

    option_hash.merge!(options.get_registration_options)

    if options.default_task_list
      option_hash.merge!(
        :default_task_list => {:name => resolve_default_task_list(options.default_task_list)}
      )
    end

    @activity_type_options << option_hash
  end
end

#add_implementation(class_or_instance) ⇒ Object

Adds an activity implementation to this ‘ActivityWorker`.

Parameters:



319
320
321
# File 'lib/aws/decider/worker.rb', line 319

def add_implementation(class_or_instance)
  add_activities_implementation(class_or_instance)
end

#registerObject

Registers the activity type.



325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/aws/decider/worker.rb', line 325

def register
  @activity_type_options.each do |activity_type_options|
    begin
      @service.register_activity_type(activity_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      @logger.warn "#{e.class} while trying to register activity #{e.message} with options #{activity_type_options}"
      previous_registration = @service.describe_activity_type(
        :domain => @domain.name,
        :activity_type => {
          :name => activity_type_options[:name],
          :version => activity_type_options[:version]
        }
      )
      default_options = activity_type_options.select { |key, val| key =~ /default/}
      previous_keys = previous_registration["configuration"].keys.map {|x| camel_case_to_snake_case(x).to_sym}

      previous_registration = Hash[previous_keys.zip(previous_registration["configuration"].values)]
      if previous_registration[:default_task_list]
        previous_registration[:default_task_list][:name] = previous_registration[:default_task_list].delete("name")
      end
      registration_difference =  default_options.sort.to_a - previous_registration.sort.to_a

      unless registration_difference.empty?
        raise "Activity [#{activity_type_options[:name]}]: There is a difference between the types you have registered previously and the types you are currently registering, but you haven't changed the version. These new changes will not be picked up. In particular, these options are different #{Hash[registration_difference]}"
      end
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end

#run_once(should_register = true, poller = nil) ⇒ Object

Starts the activity that was added to the ‘ActivityWorker` and, optionally, sets the AWS::Flow::ActivityTaskPoller.

Parameters:



430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/aws/decider/worker.rb', line 430

def run_once(should_register = true, poller = nil)
  register if should_register
  poller = ActivityTaskPoller.new(
    @service,
    @domain,
    @task_list,
    @activity_definition_map,
    @executor,
    @options
  ) if poller.nil?

  Kernel.exit if @shutting_down
  poller.poll_and_process_single_task(@options.use_forking)
end

#start(should_register = true) ⇒ Object

Starts the activity that was added to the ‘ActivityWorker`.

Parameters:

  • should_register (true, false) (defaults to: true)

    Set to ‘false` if the activity should not register itself (it is already registered).



401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
# File 'lib/aws/decider/worker.rb', line 401

def start(should_register = true)

  register if should_register
  poller = ActivityTaskPoller.new(
    @service,
    @domain,
    @task_list,
    @activity_definition_map,
    @executor,
    @options
  )

  @logger.debug "Starting an infinite loop to poll and process activity tasks."
  loop do
    run_once(false, poller)
  end
end