Class: Aws::Rails::SqsActiveJob::Executor
- Inherits:
-
Object
- Object
- Aws::Rails::SqsActiveJob::Executor
- Defined in:
- lib/aws/rails/sqs_active_job/executor.rb
Overview
CLI runner for polling for SQS ActiveJobs
Constant Summary collapse
- DEFAULTS =
{ min_threads: 0, max_threads: Concurrent.processor_count, auto_terminate: true, idletime: 60, # 1 minute fallback_policy: :caller_runs # slow down the producer thread }.freeze
Instance Method Summary collapse
-
#execute(message) ⇒ Object
TODO: Consider catching the exception and sleeping instead of using :caller_runs.
-
#initialize(options = {}) ⇒ Executor
constructor
A new instance of Executor.
- #shutdown(timeout = nil) ⇒ Object
Constructor Details
Instance Method Details
#execute(message) ⇒ Object
TODO: Consider catching the exception and sleeping instead of using :caller_runs
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/aws/rails/sqs_active_job/executor.rb', line 25 def execute() @executor.post() do || begin job = JobRunner.new() @logger.info("Running job: #{job.id}[#{job.class_name}]") job.run .delete rescue Aws::Json::ParseError => e @logger.error "Unable to parse message body: #{.data.body}. Error: #{e}." rescue StandardError => e # message will not be deleted and will be retried job_msg = job ? "#{job.id}[#{job.class_name}]" : 'unknown job' @logger.info "Error processing job #{job_msg}: #{e}" @logger.debug e.backtrace.join("\n") end end end |
#shutdown(timeout = nil) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/aws/rails/sqs_active_job/executor.rb', line 43 def shutdown(timeout=nil) @executor.shutdown clean_shutdown = @executor.wait_for_termination(timeout) if clean_shutdown @logger.info 'Clean shutdown complete. All executing jobs finished.' else @logger.info "Timeout (#{timeout}) exceeded. Some jobs may not have"\ " finished cleanly. Unfinished jobs will not be removed from"\ " the queue and can be ru-run once their visibility timeout"\ " passes." end end |