Class: Aws::Rails::SqsActiveJob::Executor
- Inherits:
-
Object
- Object
- Aws::Rails::SqsActiveJob::Executor
- Defined in:
- lib/aws/rails/sqs_active_job/executor.rb
Overview
CLI runner for polling for SQS ActiveJobs
Constant Summary collapse
- DEFAULTS =
{ min_threads: 0, max_threads: Concurrent.processor_count, auto_terminate: true, idletime: 60, # 1 minute fallback_policy: :caller_runs # slow down the producer thread # TODO: Consider catching the exception and sleeping instead of using :caller_runs }.freeze
Instance Method Summary collapse
- #execute(message) ⇒ Object
-
#initialize(options = {}) ⇒ Executor
constructor
A new instance of Executor.
- #shutdown(timeout = nil) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Executor
Returns a new instance of Executor.
19 20 21 22 23 |
# File 'lib/aws/rails/sqs_active_job/executor.rb', line 19 def initialize( = {}) @executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge()) @retry_standard_errors = [:retry_standard_errors] @logger = [:logger] || ActiveSupport::Logger.new($stdout) end |
Instance Method Details
#execute(message) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/aws/rails/sqs_active_job/executor.rb', line 25 def execute() @executor.post() do || job = JobRunner.new() @logger.info("Running job: #{job.id}[#{job.class_name}]") job.run .delete rescue Aws::Json::ParseError => e @logger.error "Unable to parse message body: #{.data.body}. Error: #{e}." rescue StandardError => e job_msg = job ? "#{job.id}[#{job.class_name}]" : 'unknown job' @logger.info "Error processing job #{job_msg}: #{e}" @logger.debug e.backtrace.join("\n") if @retry_standard_errors && !job.exception_executions? @logger.info( 'retry_standard_errors is enabled and job has not ' \ "been retried by Rails. Leaving #{job_msg} in the queue." ) else .delete end end end |
#shutdown(timeout = nil) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/aws/rails/sqs_active_job/executor.rb', line 49 def shutdown(timeout = nil) @executor.shutdown clean_shutdown = @executor.wait_for_termination(timeout) if clean_shutdown @logger.info 'Clean shutdown complete. All executing jobs finished.' else @logger.info "Timeout (#{timeout}) exceeded. Some jobs may not have " \ 'finished cleanly. Unfinished jobs will not be removed from ' \ 'the queue and can be ru-run once their visibility timeout ' \ 'passes.' end end |