Class: AWS::Flow::ActivityTaskPoller

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/decider/task_poller.rb

Overview

A poller for activity tasks.

Instance Method Summary collapse

Constructor Details

#initialize(service, domain, task_list, activity_definition_map, executor, options = nil) ⇒ ActivityTaskPoller

Initializes a new ‘ActivityTaskPoller`.

Parameters:

  • service

    Required. The AWS::SimpleWorkflow instance to use.

  • domain

    Required. The domain used by the workflow.

  • task_list

    Required. The task list used to poll for activity tasks.

  • activity_definition_map

    Required. The AWS::Flow::ActivityDefinition instance that implements the activity to run. This map is in the form:

    { :activity_type => 'activity_definition_name' }
    

    The named activity definition will be run when the #execute method is called.

  • options (defaults to: nil)

    Optional. Options to set for the activity poller. You can set the following options:

    • ‘logger` - The logger to use.

    • ‘max_workers` - The maximum number of workers that can be running at

      once. The default is 20.
      


119
120
121
122
123
124
125
126
127
# File 'lib/aws/decider/task_poller.rb', line 119

def initialize(service, domain, task_list, activity_definition_map, executor, options=nil)
  @service = service
  @domain = domain
  @task_list = task_list
  @activity_definition_map = activity_definition_map
  @logger = options.logger if options
  @logger ||= Utilities::LogFactory.make_logger(self)
  @executor = executor
end

Instance Method Details

#execute(task) ⇒ Object

Executes the specified activity task.

Parameters:



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/aws/decider/task_poller.rb', line 136

def execute(task)
  activity_type = task.activity_type
  begin
    context = ActivityExecutionContext.new(@service, @domain, task)
    unless activity_implementation = @activity_definition_map[activity_type]
      raise "This activity worker was told to work on activity type #{activity_type.name}.#{activity_type.version}, but this activity worker only knows how to work on #{@activity_definition_map.keys.map(&:name).join' '}"
    end

    output, original_result, too_large = activity_implementation.execute(task.input, context)

     @logger.debug "Responding on task_token #{task.task_token} for task #{task}."
    if too_large
      @logger.error "Activity #{activity_type.name}.#{activity_type.version} failed: The output of this activity was too large (greater than 2^15), and therefore aws-flow could not return it to SWF. aws-flow is now attempting to mark this activity as failed. For reference, the result was #{original_result}"

      respond_activity_task_failed_with_retry(
        task.task_token,
        "An activity cannot send a response with a result larger than 32768 characters. Please reduce the response size. A truncated prefix output is included in the details field.",
        output
      )
    elsif ! activity_implementation.execution_options.manual_completion
      @service.respond_activity_task_completed(
        :task_token => task.task_token,
        :result => output
      )
    end
  rescue ActivityFailureException => e
    @logger.error "Activity #{activity_type.name}.#{activity_type.version} with input #{task.input} failed with exception #{e}."

    respond_activity_task_failed_with_retry(
      task.task_token,
      e.message,
      e.details
    )
  end
  #TODO all the completion stuffs
end

#poll_and_process_single_task(use_forking = true) ⇒ Object

Polls the task list for a new activity task, and executes it if one is found.

If ‘use_forking` is set to `true` and the maximum number of workers (as set in #initialize) are already executing, this method will block until the number of running workers is less than the maximum.

Parameters:

  • use_forking (defaults to: true)

    Optional. Whether to use forking to execute the task. On Windows, you should set this to ‘false`.



318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# File 'lib/aws/decider/task_poller.rb', line 318

def poll_and_process_single_task(use_forking = true)
  @poll_semaphore ||= SuspendableSemaphore.new
  @poll_semaphore.acquire
  semaphore_needs_release = true
  @logger.debug "Before the poll"
  begin
    if use_forking
      @executor.block_on_max_workers
    end
    @logger.debug "Polling for a new activity task of type #{@activity_definition_map.keys.map{ |x| "#{x.name} #{x.version}"} } on task_list: #{@task_list}"
    task = @domain.activity_tasks.poll_for_single_task(@task_list)
    if task
      @logger.info Utilities.activity_task_to_debug_string("Got activity task", task)
    end
  rescue Exception => e
    @logger.error "Error in the poller, #{e.class}, #{e}"
    @poll_semaphore.release
    return false
  end
  if task.nil?
    @logger.debug "Didn't get a task on task_list: #{@task_list}"
    @poll_semaphore.release
    return false
  end
  semaphore_needs_release = false
  if use_forking
    @executor.execute { process_single_task(task) }
  else
    process_single_task(task)
  end
  @logger.info Utilities.activity_task_to_debug_string("Finished executing task", task)
  return true
end

#process_single_task(task) ⇒ Object

Processes the specified activity task.

Parameters:



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/aws/decider/task_poller.rb', line 268

def process_single_task(task)

  # We are using the 'build' method to create a new ConnectionPool here to
  # make sure that connection pools are not shared among forked processes.
  # The default behavior of the ConnectionPool class is to cache a pool
  # for a set of options created by the 'new' method and always use the 
  # same pool for the same set of options. This is undesirable when
  # multiple processes want to use different connection pools with same
  # options as is the case here.
  # Since we can't change the pool of an already existing NetHttpHandler,
  # we also create a new NetHttpHandler in order to use the new pool.

  options = @service.config.to_h
  options[:connection_pool] = AWS::Core::Http::ConnectionPool.build(options[:http_handler].pool.options)
  options[:http_handler] = AWS::Core::Http::NetHttpHandler.new(options)
  @service = AWS::SimpleWorkflow.new(options).client

  begin
    begin
      execute(task)
    rescue CancellationException => e
      @logger.error "Got an error, #{e.message}, while executing #{task.activity_type.name}."
      respond_activity_task_canceled_with_retry(task.task_token, e.message)
    rescue Exception => e
      @logger.error "Got an error, #{e.message}, while executing #{task.activity_type.name}. Full stack trace: #{e.backtrace}"
      respond_activity_task_failed_with_retry(task.task_token, e.message, e.backtrace)
      #Do rescue stuff
    ensure
      @poll_semaphore.release
    end
  rescue Exception => e
    semaphore_needs_release = true
    @logger.error "Got into the other error mode: #{e}"
    raise e
  ensure
    @poll_semaphore.release if semaphore_needs_release
  end
end

#respond_activity_task_canceled(task_token, message) ⇒ Object

Responds to the decider that the activity task should be canceled. No retry is attempted.

Parameters:

  • task_token

    Required. The task token from the AWS::Flow::ActivityDefinition object to retry.

  • message

    Required. A message that provides detail about why the activity task is cancelled.



234
235
236
# File 'lib/aws/decider/task_poller.rb', line 234

def respond_activity_task_canceled(task_token, message)
  @service.respond_activity_task_canceled({:task_token => task_token, :details => message})
end

#respond_activity_task_canceled_with_retry(task_token, message) ⇒ Object

Note:

Retry behavior for this method is currently *not implemented*. For now, it simply wraps #respond_activity_task_canceled.

Responds to the decider that the activity task should be canceled, and attempts to retry the task.

Parameters:

  • task_token

    Required. The task token from the AWS::Flow::ActivityDefinition object to retry.

  • message

    Required. A message that provides detail about why the activity task is cancelled.



216
217
218
219
220
221
# File 'lib/aws/decider/task_poller.rb', line 216

def respond_activity_task_canceled_with_retry(task_token, message)
  if @failure_retrier.nil?
    respond_activity_task_canceled(task_token, message)
  end
  #TODO Set up other stuff to do if we have it
end

#respond_activity_task_failed(task_token, reason, details) ⇒ Object

Responds to the decider that the activity task has failed. No retry is attempted.

Parameters:

  • task_token

    Required. The task token from the AWS::Flow::ActivityDefinition object to retry. The task token is generated by the service and should be treated as an opaque value.

  • reason

    Required. Description of the error that may assist in diagnostics. Although this value is required, you can set it to an empty string if you don’t need this information.

  • details

    Required. Detailed information about the failure. Although this value is required, you can set it to an empty string if you don’t need this information.



256
257
258
259
# File 'lib/aws/decider/task_poller.rb', line 256

def respond_activity_task_failed(task_token, reason, details)
  @logger.debug "The task token to be reported on is #{task_token}"
  @service.respond_activity_task_failed(:task_token => task_token, :reason => reason.to_s, :details => details.to_s)
end

#respond_activity_task_failed_with_retry(task_token, reason, details) ⇒ Object

Note:

Retry behavior for this method is currently *not implemented*. For now, it simply wraps #respond_activity_task_failed.

Responds to the decider that the activity task has failed, and attempts to retry the task.

Parameters:

  • task_token

    Required. The task token from the AWS::Flow::ActivityDefinition object to retry. The task token is generated by the service and should be treated as an opaque value.

  • reason

    Required. Description of the error that may assist in diagnostics. Although this value is required, you can set it to an empty string if you don’t need this information.

  • details

    Required. Detailed information about the failure. Although this value is required, you can set it to an empty string if you don’t need this information.



194
195
196
197
198
199
200
# File 'lib/aws/decider/task_poller.rb', line 194

def respond_activity_task_failed_with_retry(task_token, reason, details)
  #TODO Set up this variable
  if @failure_retrier.nil?
    respond_activity_task_failed(task_token, reason, details)
    #TODO Set up other stuff to do if we have it
  end
end