Class: AWS::Flow::ActivityTaskPoller
- Inherits:
-
Object
- Object
- AWS::Flow::ActivityTaskPoller
- Defined in:
- lib/aws/decider/task_poller.rb
Overview
A poller for activity tasks.
Instance Method Summary collapse
-
#execute(task) ⇒ Object
Executes the specified activity task.
-
#initialize(service, domain, task_list, activity_definition_map, executor, options = nil) ⇒ ActivityTaskPoller
constructor
Initializes a new ‘ActivityTaskPoller`.
-
#poll_and_process_single_task(use_forking = true) ⇒ Object
Polls the task list for a new activity task, and executes it if one is found.
-
#process_single_task(task) ⇒ Object
Processes the specified activity task.
-
#respond_activity_task_canceled(task_token, message) ⇒ Object
Responds to the decider that the activity task should be canceled.
-
#respond_activity_task_canceled_with_retry(task_token, message) ⇒ Object
private
Responds to the decider that the activity task should be canceled, and attempts to retry the task.
-
#respond_activity_task_failed(task_token, reason, details) ⇒ Object
Responds to the decider that the activity task has failed.
-
#respond_activity_task_failed_with_retry(task_token, reason, details) ⇒ Object
private
Responds to the decider that the activity task has failed, and attempts to retry the task.
Constructor Details
#initialize(service, domain, task_list, activity_definition_map, executor, options = nil) ⇒ ActivityTaskPoller
Initializes a new ‘ActivityTaskPoller`.
116 117 118 119 120 121 122 123 124 |
# File 'lib/aws/decider/task_poller.rb', line 116 def initialize(service, domain, task_list, activity_definition_map, executor, =nil) @service = service @domain = domain @task_list = task_list @activity_definition_map = activity_definition_map @logger = .logger if @logger ||= Utilities::LogFactory.make_logger(self) @executor = executor end |
Instance Method Details
#execute(task) ⇒ Object
Executes the specified activity task.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/aws/decider/task_poller.rb', line 133 def execute(task) activity_type = task.activity_type begin context = ActivityExecutionContext.new(@service, @domain, task) unless activity_implementation = @activity_definition_map[activity_type] raise "This activity worker was told to work on activity type #{activity_type.name}.#{activity_type.version}, but this activity worker only knows how to work on #{@activity_definition_map.keys.map(&:name).join' '}" end output, original_result, too_large = activity_implementation.execute(task.input, context) @logger.debug "Responding on task_token #{task.task_token} for task #{task}." if too_large @logger.error "Activity #{activity_type.name}.#{activity_type.version} failed: The output of this activity was too large (greater than 2^15), and therefore aws-flow could not return it to SWF. aws-flow is now attempting to mark this activity as failed. For reference, the result was #{original_result}" respond_activity_task_failed_with_retry( task.task_token, "An activity cannot send a response with a result larger than 32768 characters. Please reduce the response size. A truncated prefix output is included in the details field.", output ) elsif ! activity_implementation..manual_completion @service.respond_activity_task_completed( :task_token => task.task_token, :result => output ) end rescue ActivityFailureException => e @logger.error "Activity #{activity_type.name}.#{activity_type.version} with input #{task.input} failed with exception #{e}." respond_activity_task_failed_with_retry( task.task_token, e., e.details ) end #TODO all the completion stuffs end |
#poll_and_process_single_task(use_forking = true) ⇒ Object
Polls the task list for a new activity task, and executes it if one is found.
If ‘use_forking` is set to `true` and the maximum number of workers (as set in #initialize) are already executing, this method will block until the number of running workers is less than the maximum.
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/aws/decider/task_poller.rb', line 295 def poll_and_process_single_task(use_forking = true) @poll_semaphore ||= SuspendableSemaphore.new @poll_semaphore.acquire semaphore_needs_release = true @logger.debug "Before the poll" begin if use_forking @executor.block_on_max_workers end @logger.debug "Polling for a new activity task of type #{@activity_definition_map.keys.map{ |x| "#{x.name} #{x.version}"} } on task_list: #{@task_list}" task = @domain.activity_tasks.poll_for_single_task(@task_list) if task @logger.info Utilities.activity_task_to_debug_string("Got activity task", task) end rescue Exception => e @logger.error "Error in the poller, #{e.class}, #{e}" @poll_semaphore.release return false end if task.nil? @logger.debug "Didn't get a task on task_list: #{@task_list}" @poll_semaphore.release return false end semaphore_needs_release = false if use_forking @executor.execute { process_single_task(task) } else process_single_task(task) end @logger.info Utilities.activity_task_to_debug_string("Finished executing task", task) return true end |
#process_single_task(task) ⇒ Object
Processes the specified activity task.
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 |
# File 'lib/aws/decider/task_poller.rb', line 244 def process_single_task(task) # We are using the 'build' method to create a new ConnectionPool here to # make sure that connection pools are not shared among forked processes. # The default behavior of the ConnectionPool class is to cache a pool # for a set of options created by the 'new' method and always use the # same pool for the same set of options. This is undesirable when # multiple processes want to use different connection pools with same # options as is the case here. # # Since we can't change the pool of an already existing NetHttpHandler, # we also create a new NetHttpHandler in order to use the new pool. = @service.config.to_h [:connection_pool] = AWS::Core::Http::ConnectionPool.build([:http_handler].pool.) [:http_handler] = AWS::Core::Http::NetHttpHandler.new() @service = AWS::SimpleWorkflow.new().client begin begin execute(task) rescue CancellationException => e @logger.error "Got an error, #{e.}, while executing #{task.activity_type.name}." respond_activity_task_canceled_with_retry(task.task_token, e.) rescue Exception => e @logger.error "Got an error, #{e.}, while executing #{task.activity_type.name}. Full stack trace: #{e.backtrace}" respond_activity_task_failed_with_retry(task.task_token, e., e.backtrace) #Do rescue stuff ensure @poll_semaphore.release end rescue Exception => e semaphore_needs_release = true @logger.error "Got into the other error mode: #{e}" raise e ensure @poll_semaphore.release if semaphore_needs_release end end |
#respond_activity_task_canceled(task_token, message) ⇒ Object
Responds to the decider that the activity task should be canceled. No retry is attempted.
210 211 212 |
# File 'lib/aws/decider/task_poller.rb', line 210 def respond_activity_task_canceled(task_token, ) @service.respond_activity_task_canceled({:task_token => task_token, :details => }) end |
#respond_activity_task_canceled_with_retry(task_token, message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Retry behavior for this method is currently *not implemented*. For now, it simply wraps #respond_activity_task_canceled.
Responds to the decider that the activity task should be canceled, and attempts to retry the task.
192 193 194 195 196 197 |
# File 'lib/aws/decider/task_poller.rb', line 192 def respond_activity_task_canceled_with_retry(task_token, ) if @failure_retrier.nil? respond_activity_task_canceled(task_token, ) end #TODO Set up other stuff to do if we have it end |
#respond_activity_task_failed(task_token, reason, details) ⇒ Object
Responds to the decider that the activity task has failed. No retry is attempted.
232 233 234 235 |
# File 'lib/aws/decider/task_poller.rb', line 232 def respond_activity_task_failed(task_token, reason, details) @logger.debug "The task token to be reported on is #{task_token}" @service.respond_activity_task_failed(:task_token => task_token, :reason => reason.to_s, :details => details.to_s) end |
#respond_activity_task_failed_with_retry(task_token, reason, details) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Retry behavior for this method is currently *not implemented*. For now, it simply wraps #respond_activity_task_failed.
Responds to the decider that the activity task has failed, and attempts to retry the task.
177 178 179 180 181 182 183 |
# File 'lib/aws/decider/task_poller.rb', line 177 def respond_activity_task_failed_with_retry(task_token, reason, details) #TODO Set up this variable if @failure_retrier.nil? respond_activity_task_failed(task_token, reason, details) #TODO Set up other stuff to do if we have it end end |