Class: Aws::Rails::SqsActiveJob::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/rails/sqs_active_job/executor.rb

Overview

CLI runner for polling for SQS ActiveJobs

Constant Summary collapse

DEFAULTS =
{
  min_threads: 0,
  max_threads: Concurrent.processor_count,
  auto_terminate: true,
  idletime: 60, # 1 minute
  fallback_policy: :caller_runs # slow down the producer thread
  # TODO: Consider catching the exception and sleeping instead of using :caller_runs
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Executor

Returns a new instance of Executor.



19
20
21
22
23
# File 'lib/aws/rails/sqs_active_job/executor.rb', line 19

def initialize(options = {})
  @executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge(options))
  @retry_standard_errors = options[:retry_standard_errors]
  @logger = options[:logger] || ActiveSupport::Logger.new($stdout)
end

Instance Method Details

#execute(message) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/aws/rails/sqs_active_job/executor.rb', line 25

def execute(message)
  @executor.post(message) do |message|
    job = JobRunner.new(message)
    @logger.info("Running job: #{job.id}[#{job.class_name}]")
    job.run
    message.delete
  rescue Aws::Json::ParseError => e
    @logger.error "Unable to parse message body: #{message.data.body}. Error: #{e}."
  rescue StandardError => e
    job_msg = job ? "#{job.id}[#{job.class_name}]" : 'unknown job'
    @logger.info "Error processing job #{job_msg}: #{e}"
    @logger.debug e.backtrace.join("\n")

    if @retry_standard_errors && !job.exception_executions?
      @logger.info(
        'retry_standard_errors is enabled and job has not ' \
        "been retried by Rails.  Leaving #{job_msg} in the queue."
      )
    else
      message.delete
    end
  end
end

#shutdown(timeout = nil) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/aws/rails/sqs_active_job/executor.rb', line 49

def shutdown(timeout = nil)
  @executor.shutdown
  clean_shutdown = @executor.wait_for_termination(timeout)
  if clean_shutdown
    @logger.info 'Clean shutdown complete.  All executing jobs finished.'
  else
    @logger.info "Timeout (#{timeout}) exceeded.  Some jobs may not have " \
                 'finished cleanly.  Unfinished jobs will not be removed from ' \
                 'the queue and can be ru-run once their visibility timeout ' \
                 'passes.'
  end
end