Class: AWS::Flow::ActivityTaskPoller

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/decider/task_poller.rb

Instance Method Summary collapse

Constructor Details

#initialize(service, domain, task_list, activity_definition_map, options = nil) ⇒ ActivityTaskPoller

Returns a new instance of ActivityTaskPoller.



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/aws/decider/task_poller.rb', line 83

def initialize(service, domain, task_list, activity_definition_map, options=nil)
  @service = service
  @domain = domain
  @task_list = task_list
  @activity_definition_map = activity_definition_map
  @logger = options.logger if options
  @logger ||= Utilities::LogFactory.make_logger(self, "debug")
  max_workers = options.execution_workers if options
  max_workers = 20 if (max_workers.nil? || max_workers.zero?)
  @executor = ForkingExecutor.new(:max_workers => max_workers, :logger => @logger)

end

Instance Method Details

#execute(task) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/aws/decider/task_poller.rb', line 96

def execute(task)
  activity_type = task.activity_type
  begin
    context = ActivityExecutionContext.new(@service, @domain, task)
    activity_implementation = @activity_definition_map[activity_type]
    raise "This activity worker was told to work on activity type #{activity_type.name}, but this activity worker only knows how to work on #{@activity_definition_map.keys.map(&:name).join' '}" unless activity_implementation
    output = activity_implementation.execute(task.input, context)
    @logger.debug "Responding on task_token #{task.task_token} for task #{task}"
    if ! activity_implementation.execution_options.manual_completion
      @service.respond_activity_task_completed(:task_token => task.task_token, :result => output)
    end
  rescue ActivityFailureException => e
    respond_activity_task_failed_with_retry(task.task_token, e.message, e.details)
  end
  #TODO all the completion stuffs
end

#poll_and_process_single_task(use_forking = true) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/aws/decider/task_poller.rb', line 161

def poll_and_process_single_task(use_forking = true)

  @poll_semaphore ||= SuspendableSemaphore.new
  @poll_semaphore.acquire
  semaphore_needs_release = true
  @logger.debug "before the poll\n\n"
  begin
    task = @domain.activity_tasks.poll_for_single_task(@task_list)
    @logger.error "got a task, #{task.activity_type.name}"
    @logger.error "The task token I got was: #{task.task_token}"
  rescue Exception => e
    @logger.debug "I have not been able to poll successfully, and am now bailing out, with error #{e}"
    @poll_semaphore.release
    return false
  end
  if task.nil?
    "Still polling at #{Time.now}, but didn't get anything"
    @logger.debug "Still polling at #{Time.now}, but didn't get anything"
    @poll_semaphore.release
    return false
  end
  semaphore_needs_release = false
  if use_forking
    @executor.execute { process_single_task(task) }
  else
    process_single_task(task)
  end
  # process_single_task(task)
  @logger.debug "finished executing the task"
  return true
end

#process_single_task(task) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/aws/decider/task_poller.rb', line 137

def process_single_task(task)
  @service = AWS::SimpleWorkflow.new.client.with_http_handler(AWS::Core::Http::NetHttpHandler.new(:ssl_ca_file => AWS.config.ssl_ca_file))
  begin
    begin
      execute(task)
    rescue CancellationException => e
      respond_activity_task_canceled_with_retry(task.task_token, e.message)
    rescue Exception => e
      @logger.error "Got an error, #{e.message}, while executing #{task.activity_type.name}"
      @logger.error "Full stack trace: #{e.backtrace}"
      respond_activity_task_failed_with_retry(task.task_token, e.message, e.backtrace)
      #Do rescue stuff
    ensure
      @poll_semaphore.release
    end
  rescue Exception => e
    semaphore_needs_release = true
    @logger.debug "Got into the other error mode"
    raise e
  ensure
    @poll_semaphore.release if semaphore_needs_release
  end
end

#respond_activity_task_canceled(task_token, message) ⇒ Object



128
129
130
# File 'lib/aws/decider/task_poller.rb', line 128

def respond_activity_task_canceled(task_token, message)
  @service.respond_activity_task_canceled({:task_token => task_token, :details => message})
end

#respond_activity_task_canceled_with_retry(task_token, message) ⇒ Object



121
122
123
124
125
126
# File 'lib/aws/decider/task_poller.rb', line 121

def respond_activity_task_canceled_with_retry(task_token, message)
  if @failure_retrier.nil?
    respond_activity_task_canceled(task_token, message)
  end
  #TODO Set up other stuff to do if we have it
end

#respond_activity_task_failed(task_token, reason, details) ⇒ Object



132
133
134
135
# File 'lib/aws/decider/task_poller.rb', line 132

def respond_activity_task_failed(task_token, reason, details)
  @logger.debug "The task token to be reported on is #{task_token}"
  @service.respond_activity_task_failed(:task_token => task_token, :reason => reason.to_s, :details => details.to_s)
end

#respond_activity_task_failed_with_retry(task_token, reason, details) ⇒ Object



113
114
115
116
117
118
119
# File 'lib/aws/decider/task_poller.rb', line 113

def respond_activity_task_failed_with_retry(task_token, reason, details)
  #TODO Set up this variable
  if @failure_retrier.nil?
    respond_activity_task_failed(task_token, reason, details)
    #TODO Set up other stuff to do if we have it
  end
end