Class: AWS::Flow::ActivityWorker
- Inherits:
-
GenericWorker
- Object
- GenericWorker
- AWS::Flow::ActivityWorker
- Defined in:
- lib/aws/decider/worker.rb
Overview
For implementing activity workers, you can use the ActivityWorker class to conveniently poll a task list for activity tasks.
You configure the activity worker with activity implementation objects. This worker class then polls for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided, and calls the activity method to process the task. Unlike the WorkflowWorker, which creates a new instance for every decision task, the ActivityWorker simply uses the object you provided.
Instance Method Summary collapse
-
#add_activities_implementation(class_or_instance) ⇒ Object
Adds an Activity implementation to this ActivityWorker.
-
#add_implementation(class_or_instance) ⇒ Object
Adds an Activity implementation to this ActivityWorker.
-
#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker
constructor
Creates a new ActivityWorker instance.
-
#register ⇒ Object
Registers the activity type.
-
#run_once(should_register = true, poller = nil) ⇒ Object
Starts the Activity that was added to the ActivityWorker and, optionally, sets the ActivityTaskPoller.
-
#start(should_register = true) ⇒ Object
Starts the Activity that was added to the ActivityWorker.
Constructor Details
#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker
Creates a new ActivityWorker instance.
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/aws/decider/worker.rb', line 244 def initialize(service, domain, task_list, *args, &block) @activity_definition_map = {} @activity_type_options = [] @options = Utilities::(WorkerOptions, block) @logger = @options.logger if @options @logger ||= Utilities::LogFactory.make_logger(self, "debug") max_workers = @options.execution_workers if @options max_workers = 20 if (max_workers.nil? || max_workers.zero?) @executor = ForkingExecutor.new(:max_workers => max_workers, :logger => @logger) @shutdown_first_time_function = lambda do @executor.shutdown Float::INFINITY Kernel.exit end super(service, domain, task_list, *args) end |
Instance Method Details
#add_activities_implementation(class_or_instance) ⇒ Object
Adds an Activity implementation to this ActivityWorker.
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/aws/decider/worker.rb', line 297 def add_activities_implementation(class_or_instance) klass = (class_or_instance.class == Class) ? class_or_instance : class_or_instance.class instance = (class_or_instance.class == Class) ? class_or_instance.new : class_or_instance klass.activities.each do |activity_type| #TODO this should assign to an activityImplementation, so that we can call execute on it later @activity_definition_map[activity_type] = ActivityDefinition.new(instance, activity_type.name.split(".").last, nil, activity_type., activity_type..data_converter) = activity_type. option_hash = { :domain => @domain.name, :name => activity_type.name.to_s, :version => activity_type.version } option_hash.merge!(.) option_hash.merge!(:default_task_list => {:name => .default_task_list}) if .default_task_list @activity_type_options << option_hash end end |
#add_implementation(class_or_instance) ⇒ Object
Adds an Activity implementation to this ActivityWorker.
265 266 267 |
# File 'lib/aws/decider/worker.rb', line 265 def add_implementation(class_or_instance) add_activities_implementation(class_or_instance) end |
#register ⇒ Object
Registers the activity type
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/aws/decider/worker.rb', line 271 def register @activity_type_options.each do || begin @service.register_activity_type() rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e previous_registration = @service.describe_activity_type(:domain => @domain.name, :activity_type => {:name => [:name], :version => [:version]}) = .select {|key, val| key =~ /default/} previous_keys = previous_registration["configuration"].keys.map {|x| camel_case_to_snake_case(x).to_sym} previous_registration = Hash[previous_keys.zip(previous_registration["configuration"].values)] if previous_registration[:default_task_list] previous_registration[:default_task_list][:name] = previous_registration[:default_task_list].delete("name") end registration_difference = .sort.to_a - previous_registration.sort.to_a raise "There is a difference between the types you have registered previously and the types you are currently registering, but you haven't changed the version. These new changes will not be picked up. In particular, these options are different #{Hash[registration_difference]}" unless registration_difference.empty? # Purposefully eaten up, the alternative is to check first, and who # wants to do two trips when one will do? end end end |
#run_once(should_register = true, poller = nil) ⇒ Object
Starts the Activity that was added to the ActivityWorker and, optionally, sets the ActivityTaskPoller.
338 339 340 341 342 |
# File 'lib/aws/decider/worker.rb', line 338 def run_once(should_register = true, poller = nil) register if should_register poller = ActivityTaskPoller.new(@service, @domain, @task_list, @activity_definition_map, @executor, @options) if poller.nil? poller.poll_and_process_single_task(@options.use_forking) end |
#start(should_register = true) ⇒ Object
Starts the Activity that was added to the ActivityWorker
322 323 324 325 326 327 328 |
# File 'lib/aws/decider/worker.rb', line 322 def start(should_register = true) register if should_register poller = ActivityTaskPoller.new(@service, @domain, @task_list, @activity_definition_map, @executor, @options) loop do run_once(false, poller) end end |