Class: Aws::ActiveJob::SQS::Executor
- Inherits:
-
Object
- Object
- Aws::ActiveJob::SQS::Executor
- Defined in:
- lib/aws/active_job/sqs/executor.rb
Overview
CLI runner for polling for SQS ActiveJobs
Constant Summary collapse
- DEFAULTS =
{ min_threads: 0, max_threads: Integer(Concurrent.available_processor_count || Concurrent.processor_count), auto_terminate: true, idletime: 60, # 1 minute fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled }.freeze
Class Method Summary collapse
Instance Method Summary collapse
- #execute(message) ⇒ Object
-
#initialize(options = {}) ⇒ Executor
constructor
A new instance of Executor.
- #shutdown(timeout = nil) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Executor
Returns a new instance of Executor.
32 33 34 35 36 37 |
# File 'lib/aws/active_job/sqs/executor.rb', line 32 def initialize( = {}) @executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge()) @retry_standard_errors = [:retry_standard_errors] @logger = [:logger] || ActiveSupport::Logger.new($stdout) @task_complete = Concurrent::Event.new end |
Class Method Details
.clear_hooks ⇒ Object
27 28 29 |
# File 'lib/aws/active_job/sqs/executor.rb', line 27 def clear_hooks @lifecycle_hooks = nil end |
.lifecycle_hooks ⇒ Object
23 24 25 |
# File 'lib/aws/active_job/sqs/executor.rb', line 23 def lifecycle_hooks @lifecycle_hooks ||= Hash.new { |h, k| h[k] = [] } end |
.on_stop(&block) ⇒ Object
19 20 21 |
# File 'lib/aws/active_job/sqs/executor.rb', line 19 def on_stop(&block) lifecycle_hooks[:stop] << block end |
Instance Method Details
#execute(message) ⇒ Object
39 40 41 42 43 44 45 46 |
# File 'lib/aws/active_job/sqs/executor.rb', line 39 def execute() post_task() rescue Concurrent::RejectedExecutionError # no capacity, wait for a task to complete @task_complete.reset @task_complete.wait retry end |
#shutdown(timeout = nil) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/aws/active_job/sqs/executor.rb', line 48 def shutdown(timeout = nil) run_hooks_for(:stop) @executor.shutdown clean_shutdown = @executor.wait_for_termination(timeout) if clean_shutdown @logger.info 'Clean shutdown complete. All executing jobs finished.' else @logger.info "Timeout (#{timeout}) exceeded. Some jobs may not have " \ 'finished cleanly. Unfinished jobs will not be removed from ' \ 'the queue and can be ru-run once their visibility timeout ' \ 'passes.' end end |