Class: AWS::Flow::ActivityTaskPoller
- Inherits:
-
Object
- Object
- AWS::Flow::ActivityTaskPoller
- Defined in:
- lib/aws/decider/task_poller.rb
Overview
A poller for activity tasks.
Instance Method Summary collapse
-
#execute(task) ⇒ Object
Executes the specified activity task.
-
#initialize(service, domain, task_list, activity_definition_map, executor, options = nil) ⇒ ActivityTaskPoller
constructor
Initializes a new ‘ActivityTaskPoller`.
-
#poll_and_process_single_task(use_forking = true) ⇒ Object
Polls the task list for a new activity task, and executes it if one is found.
-
#process_single_task(task) ⇒ Object
Processes the specified activity task.
-
#respond_activity_task_canceled(task_token, message) ⇒ Object
Responds to the decider that the activity task should be canceled.
-
#respond_activity_task_canceled_with_retry(task_token, message) ⇒ Object
private
Responds to the decider that the activity task should be canceled, and attempts to retry the task.
-
#respond_activity_task_failed(task_token, reason, details) ⇒ Object
Responds to the decider that the activity task has failed.
-
#respond_activity_task_failed_with_retry(task_token, reason, details) ⇒ Object
private
Responds to the decider that the activity task has failed, and attempts to retry the task.
Constructor Details
#initialize(service, domain, task_list, activity_definition_map, executor, options = nil) ⇒ ActivityTaskPoller
Initializes a new ‘ActivityTaskPoller`.
137 138 139 140 141 142 143 144 145 146 |
# File 'lib/aws/decider/task_poller.rb', line 137 def initialize(service, domain, task_list, activity_definition_map, executor, =nil) @service = service @service_opts = @service.config.to_h @domain = domain @task_list = task_list @activity_definition_map = activity_definition_map @logger = .logger if @logger ||= Utilities::LogFactory.make_logger(self) @executor = executor end |
Instance Method Details
#execute(task) ⇒ Object
Executes the specified activity task.
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/aws/decider/task_poller.rb', line 155 def execute(task) activity_type = task.activity_type begin context = ActivityExecutionContext.new(@service, @domain, task) unless activity_implementation = @activity_definition_map[activity_type] raise "This activity worker was told to work on activity type "\ "#{activity_type.inspect}, but this activity worker only knows "\ "how to work on #{@activity_definition_map.keys.map(&:name).join' '}" end output, original_result, too_large = activity_implementation.execute(task.input, context) @logger.debug "Responding on task_token #{task.task_token.inspect}." if too_large @logger.error "#{task.activity_type.inspect} failed: "\ "#{Utilities.validation_error_string_partial("Activity")} For "\ "reference, the result was #{original_result}" respond_activity_task_failed_with_retry( task.task_token, Utilities.validation_error_string("Activity"), "" ) elsif ! activity_implementation..manual_completion @service.respond_activity_task_completed( :task_token => task.task_token, :result => output ) end rescue ActivityFailureException => e @logger.error "#{task.activity_type.inspect} failed with exception: #{e.inspect}." respond_activity_task_failed_with_retry( task.task_token, e., e.details ) end end |
#poll_and_process_single_task(use_forking = true) ⇒ Object
Polls the task list for a new activity task, and executes it if one is found.
If ‘use_forking` is set to `true` and the maximum number of workers (as set in #initialize) are already executing, this method will block until the number of running workers is less than the maximum.
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 |
# File 'lib/aws/decider/task_poller.rb', line 359 def poll_and_process_single_task(use_forking = true) @poll_semaphore ||= SuspendableSemaphore.new @poll_semaphore.acquire semaphore_needs_release = true begin if use_forking @executor.block_on_max_workers end @logger.debug "Polling for a new activity task of type #{@activity_definition_map.keys.map{ |x| "#{x.name} #{x.version}"} } on task_list: #{@task_list}" task = @domain.activity_tasks.poll_for_single_task(@task_list) if task @logger.info Utilities.activity_task_to_debug_string("Got activity task", task) end rescue Exception => e @logger.error "Error in the poller, #{e.inspect}" @poll_semaphore.release return false end if task.nil? @logger.debug "Didn't get a task on task_list: #{@task_list}" @poll_semaphore.release return false end semaphore_needs_release = false if use_forking @executor.execute { process_single_task(task) } else process_single_task(task) end @logger.info Utilities.activity_task_to_debug_string("Finished executing task", task) return true end |
#process_single_task(task) ⇒ Object
Processes the specified activity task.
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 |
# File 'lib/aws/decider/task_poller.rb', line 310 def process_single_task(task) # We are using the 'build' method to create a new ConnectionPool here to # make sure that connection pools are not shared among forked processes. # The default behavior of the ConnectionPool class is to cache a pool # for a set of options created by the 'new' method and always use the # same pool for the same set of options. This is undesirable when # multiple processes want to use different connection pools with same # options as is the case here. # # Since we can't change the pool of an already existing NetHttpHandler, # we also create a new NetHttpHandler in order to use the new pool. @service_opts[:connection_pool] = AWS::Core::Http::ConnectionPool.build(@service_opts[:http_handler].pool.) @service_opts[:http_handler] = AWS::Core::Http::NetHttpHandler.new(@service_opts) @service = @service.(@service_opts) begin begin execute(task) rescue CancellationException => e @logger.error "#{task.activity_type.inspect} failed with exception: #{e.inspect}" respond_activity_task_canceled_with_retry(task.task_token, e.) rescue Exception => e @logger.error "#{task.activity_type.inspect} failed with exception: #{e.inspect}" respond_activity_task_failed_with_retry(task.task_token, e., e.backtrace) ensure @poll_semaphore.release end rescue Exception => e semaphore_needs_release = true @logger.error "Error in the poller, exception: #{e.inspect}. stacktrace: #{e.backtrace}" raise e ensure @poll_semaphore.release if semaphore_needs_release end end |
#respond_activity_task_canceled(task_token, message) ⇒ Object
Responds to the decider that the activity task should be canceled. No retry is attempted.
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/aws/decider/task_poller.rb', line 234 def respond_activity_task_canceled(task_token, ) begin @service.respond_activity_task_canceled( :task_token => task_token, :details => ) rescue AWS::SimpleWorkflow::Errors::ValidationException => e if e..include? "failed to satisfy constraint: Member must have length less than or equal to" # We want to ensure that the ActivityWorker doesn't just sit # around and time the activity out. If there is a validation failure # possibly because of large custom exceptions we should fail the # activity task with some minimal details respond_activity_task_failed_with_retry( task_token, Utilities.validation_error_string("Activity"), "AWS::SimpleWorkflow::Errors::ValidationException" ) end @logger.error "respond_activity_task_canceled call failed with "\ "exception: #{e.inspect}" end end |
#respond_activity_task_canceled_with_retry(task_token, message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Retry behavior for this method is currently *not implemented*. For now, it simply wraps #respond_activity_task_canceled.
Responds to the decider that the activity task should be canceled, and attempts to retry the task.
216 217 218 219 220 221 |
# File 'lib/aws/decider/task_poller.rb', line 216 def respond_activity_task_canceled_with_retry(task_token, ) if @failure_retrier.nil? respond_activity_task_canceled(task_token, ) end #TODO Set up other stuff to do if we have it end |
#respond_activity_task_failed(task_token, reason, details) ⇒ Object
Responds to the decider that the activity task has failed. No retry is attempted.
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/aws/decider/task_poller.rb', line 277 def respond_activity_task_failed(task_token, reason, details) @logger.debug "The task token to be reported on is #{task_token}" begin @service.respond_activity_task_failed( task_token: task_token, reason: reason.to_s, details: details.to_s ) rescue AWS::SimpleWorkflow::Errors::ValidationException => e if e..include? "failed to satisfy constraint: Member must have length less than or equal to" # We want to ensure that the ActivityWorker doesn't just sit # around and time the activity out. If there is a validation failure # possibly because of large custom exceptions we should fail the # activity task with some minimal details respond_activity_task_failed_with_retry( task_token, Utilities.validation_error_string("Activity"), "AWS::SimpleWorkflow::Errors::ValidationException" ) end @logger.error "respond_activity_task_failed call failed with "\ "exception: #{e.inspect}" end end |
#respond_activity_task_failed_with_retry(task_token, reason, details) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Retry behavior for this method is currently *not implemented*. For now, it simply wraps #respond_activity_task_failed.
Responds to the decider that the activity task has failed, and attempts to retry the task.
201 202 203 204 205 206 207 |
# File 'lib/aws/decider/task_poller.rb', line 201 def respond_activity_task_failed_with_retry(task_token, reason, details) #TODO Set up this variable if @failure_retrier.nil? respond_activity_task_failed(task_token, reason, details) #TODO Set up other stuff to do if we have it end end |