Class: AWS::Flow::ActivityWorker
- Inherits:
-
GenericWorker
- Object
- GenericWorker
- AWS::Flow::ActivityWorker
- Defined in:
- lib/aws/decider/worker.rb
Overview
For implementing activity workers, you can use the ActivityWorker class to conveniently poll a task list for activity tasks.
You configure the activity worker with activity implementation objects. This worker class then polls for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided, and calls the activity method to process the task. Unlike the WorkflowWorker, which creates a new instance for every decision task, the ActivityWorker simply uses the object you provided.
Instance Method Summary collapse
-
#add_activities_implementation(class_or_instance) ⇒ Object
Adds an Activity implementation to this ActivityWorker.
-
#add_implementation(class_or_instance) ⇒ Object
Adds an Activity implementation to this ActivityWorker.
-
#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker
constructor
Creates a new ActivityWorker instance.
-
#register ⇒ Object
Registers the activity type.
-
#run_once(should_register = true, poller = nil) ⇒ Object
Starts the Activity that was added to the ActivityWorker and, optionally, sets the ActivityTaskPoller.
-
#start(should_register = true) ⇒ Object
Starts the Activity that was added to the ActivityWorker.
Constructor Details
#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker
Creates a new ActivityWorker instance.
230 231 232 233 234 235 236 |
# File 'lib/aws/decider/worker.rb', line 230 def initialize(service, domain, task_list, *args, &block) @activity_definition_map = {} @executor = ForkingExecutor.new(:max_workers => 1) @activity_type_options = [] @options = Utilities::(WorkerOptions, block) super(service, domain, task_list, *args) end |
Instance Method Details
#add_activities_implementation(class_or_instance) ⇒ Object
Adds an Activity implementation to this ActivityWorker.
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/aws/decider/worker.rb', line 275 def add_activities_implementation(class_or_instance) klass = (class_or_instance.class == Class) ? class_or_instance : class_or_instance.class instance = (class_or_instance.class == Class) ? class_or_instance.new : class_or_instance klass.activities.each do |activity_type| #TODO this should assign to an activityImplementation, so that we can call execute on it later @activity_definition_map[activity_type] = ActivityDefinition.new(instance, activity_type.name.split(".").last, nil, activity_type., activity_type..data_converter) = activity_type. option_hash = { :domain => @domain.name, :name => activity_type.name.to_s, :version => activity_type.version } option_hash.merge!(.) option_hash.merge!(:default_task_list => {:name => .default_task_list}) if .default_task_list @activity_type_options << option_hash end end |
#add_implementation(class_or_instance) ⇒ Object
Adds an Activity implementation to this ActivityWorker.
243 244 245 |
# File 'lib/aws/decider/worker.rb', line 243 def add_implementation(class_or_instance) add_activities_implementation(class_or_instance) end |
#register ⇒ Object
Registers the activity type
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/aws/decider/worker.rb', line 249 def register @activity_type_options.each do || begin @service.register_activity_type() rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e previous_registration = @service.describe_activity_type(:domain => @domain.name, :activity_type => {:name => [:name], :version => [:version]}) = .select {|key, val| key =~ /default/} previous_keys = previous_registration["configuration"].keys.map {|x| camel_case_to_snake_case(x).to_sym} previous_registration = Hash[previous_keys.zip(previous_registration["configuration"].values)] if previous_registration[:default_task_list] previous_registration[:default_task_list][:name] = previous_registration[:default_task_list].delete("name") end registration_difference = .sort.to_a - previous_registration.sort.to_a raise "There is a difference between the types you have registered previously and the types you are currently registering, but you haven't changed the version. These new changes will not be picked up. In particular, these options are different #{Hash[registration_difference]}" unless registration_difference.empty? # Purposefully eaten up, the alternative is to check first, and who # wants to do two trips when one will do? end end end |
#run_once(should_register = true, poller = nil) ⇒ Object
Starts the Activity that was added to the ActivityWorker and, optionally, sets the ActivityTaskPoller.
316 317 318 319 320 |
# File 'lib/aws/decider/worker.rb', line 316 def run_once(should_register = true, poller = nil) register if should_register poller = ActivityTaskPoller.new(@service, @domain, @task_list, @activity_definition_map, @options) if poller.nil? poller.poll_and_process_single_task(@options.use_forking) end |
#start(should_register = true) ⇒ Object
Starts the Activity that was added to the ActivityWorker
300 301 302 303 304 305 306 |
# File 'lib/aws/decider/worker.rb', line 300 def start(should_register = true) register if should_register poller = ActivityTaskPoller.new(@service, @domain, @task_list, @activity_definition_map, @options) loop do run_once(false, poller) end end |