Class: AWS::Flow::ActivityTaskPoller

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/decider/task_poller.rb

Instance Method Summary collapse

Constructor Details

#initialize(service, domain, task_list, activity_definition_map, executor, options = nil) ⇒ ActivityTaskPoller

Returns a new instance of ActivityTaskPoller.



84
85
86
87
88
89
90
91
92
# File 'lib/aws/decider/task_poller.rb', line 84

def initialize(service, domain, task_list, activity_definition_map, executor, options=nil)
  @service = service
  @domain = domain
  @task_list = task_list
  @activity_definition_map = activity_definition_map
  @logger = options.logger if options
  @logger ||= Utilities::LogFactory.make_logger(self, "debug")
  @executor = executor
end

Instance Method Details

#execute(task) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/aws/decider/task_poller.rb', line 94

def execute(task)
  activity_type = task.activity_type
  begin
    context = ActivityExecutionContext.new(@service, @domain, task)
    activity_implementation = @activity_definition_map[activity_type]
    raise "This activity worker was told to work on activity type #{activity_type.name}, but this activity worker only knows how to work on #{@activity_definition_map.keys.map(&:name).join' '}" unless activity_implementation
    output, original_result, too_large = activity_implementation.execute(task.input, context)
    @logger.debug "Responding on task_token #{task.task_token} for task #{task}"
    if too_large
      @logger.warn "The output of this activity was too large (greater than 2^15), and therefore aws-flow could not return it to SWF. aws-flow is now attempting to mark this activity as failed. For reference, the result was #{original_result}"
      respond_activity_task_failed_with_retry(task.task_token, "An activity cannot send a response with a result larger than 32768 characters. Please reduce the response size. A truncated prefix output is included in the details field.", output)
    elsif ! activity_implementation.execution_options.manual_completion
      @service.respond_activity_task_completed(:task_token => task.task_token, :result => output)
    end
  rescue ActivityFailureException => e
    @logger.debug "The activity failed, with original output of #{original_result} and dataconverted result of #{output}. aws-flow will now attempt to fail it."
    respond_activity_task_failed_with_retry(task.task_token, e.message, e.details)

  end
  #TODO all the completion stuffs
end

#poll_and_process_single_task(use_forking = true) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/aws/decider/task_poller.rb', line 167

def poll_and_process_single_task(use_forking = true)
  @poll_semaphore ||= SuspendableSemaphore.new
  @poll_semaphore.acquire
  semaphore_needs_release = true
  @logger.debug "before the poll\n\n"
  # This is to warm the lazily loaded clients in the @service, so we don't
  # pay for their loading in every forked client
  @service.config.to_h
  begin
    if use_forking
      @executor.block_on_max_workers
    end
    task = @domain.activity_tasks.poll_for_single_task(@task_list)
    @logger.error "got a task, #{task.activity_type.name}"
    @logger.error "The task token I got was: #{task.task_token}"
  rescue Exception => e
    @logger.debug "I have not been able to poll successfully, and am now bailing out, with error #{e}"
    @poll_semaphore.release
    return false
  end
  if task.nil?
    "Still polling at #{Time.now}, but didn't get anything"
    @logger.debug "Still polling at #{Time.now}, but didn't get anything"
    @poll_semaphore.release
    return false
  end
  semaphore_needs_release = false
  if use_forking
    @executor.execute { process_single_task(task) }
  else
    process_single_task(task)
  end
  # process_single_task(task)
  @logger.debug "finished executing the task"
  return true
end

#process_single_task(task) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/aws/decider/task_poller.rb', line 140

def process_single_task(task)
  previous_config = @service.config.to_h
  previous_config.delete(:http_handler)
  @service = AWS::SimpleWorkflow.new(previous_config).client
  @service = @service.with_http_handler(AWS::Core::Http::NetHttpHandler.new(previous_config))
  begin
    begin
      execute(task)
    rescue CancellationException => e
      respond_activity_task_canceled_with_retry(task.task_token, e.message)
    rescue Exception => e
      @logger.error "Got an error, #{e.message}, while executing #{task.activity_type.name}"
      @logger.error "Full stack trace: #{e.backtrace}"
      respond_activity_task_failed_with_retry(task.task_token, e.message, e.backtrace)
      #Do rescue stuff
    ensure
      @poll_semaphore.release
    end
  rescue Exception => e
    semaphore_needs_release = true
    @logger.debug "Got into the other error mode"
    raise e
  ensure
    @poll_semaphore.release if semaphore_needs_release
  end
end

#respond_activity_task_canceled(task_token, message) ⇒ Object



131
132
133
# File 'lib/aws/decider/task_poller.rb', line 131

def respond_activity_task_canceled(task_token, message)
  @service.respond_activity_task_canceled({:task_token => task_token, :details => message})
end

#respond_activity_task_canceled_with_retry(task_token, message) ⇒ Object



124
125
126
127
128
129
# File 'lib/aws/decider/task_poller.rb', line 124

def respond_activity_task_canceled_with_retry(task_token, message)
  if @failure_retrier.nil?
    respond_activity_task_canceled(task_token, message)
  end
  #TODO Set up other stuff to do if we have it
end

#respond_activity_task_failed(task_token, reason, details) ⇒ Object



135
136
137
138
# File 'lib/aws/decider/task_poller.rb', line 135

def respond_activity_task_failed(task_token, reason, details)
  @logger.debug "The task token to be reported on is #{task_token}"
  @service.respond_activity_task_failed(:task_token => task_token, :reason => reason.to_s, :details => details.to_s)
end

#respond_activity_task_failed_with_retry(task_token, reason, details) ⇒ Object



116
117
118
119
120
121
122
# File 'lib/aws/decider/task_poller.rb', line 116

def respond_activity_task_failed_with_retry(task_token, reason, details)
  #TODO Set up this variable
  if @failure_retrier.nil?
    respond_activity_task_failed(task_token, reason, details)
    #TODO Set up other stuff to do if we have it
  end
end