Module: Wayfarer::BatchCompletion
- Defined in:
- lib/wayfarer/batch_completion.rb
Overview
BatchCompletion tracks the completion of a batch of jobs. It does so by incrementing and decrementing a counter in Redis.
The counter is incremented when a job is first enqueued and decremented when a job is performed. If a job is retried, the counter is not incremented. When a job succeeds or fails and thereby exceeds its retry count, the counter is decremented.
When the counter reaches zero, garbage collection deletes the Redis keys associated with the batch.
Constant Summary collapse
- EVENTS =
%w[enqueue.active_job perform.active_job retry_stopped.active_job].freeze
Class Method Summary collapse
- .call(name, _, _, _, data) ⇒ Object
- .fail!(counter) ⇒ Object
- .handle(name, job, task) ⇒ Object
- .retry?(job) ⇒ Boolean
- .subscribe! ⇒ Object
- .succeed!(task, counter) ⇒ Object
- .succeeded?(job, task) ⇒ Boolean
Class Method Details
.call(name, _, _, _, data) ⇒ Object
23 24 25 26 27 28 29 30 31 32 |
# File 'lib/wayfarer/batch_completion.rb', line 23 def call(name, _, _, _, data) return unless (job = data[:job]).is_a?(Wayfarer::Base) task = job.arguments.first # In the case of `enqueue.active_job` middleware hasn't executed yet task[:redis_pool] ||= Wayfarer::Redis::Pool.instance handle(name, job, task) end |
.fail!(counter) ⇒ Object
48 49 50 |
# File 'lib/wayfarer/batch_completion.rb', line 48 def fail!(counter) counter.decrement end |
.handle(name, job, task) ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'lib/wayfarer/batch_completion.rb', line 34 def handle(name, job, task) counter = Wayfarer::Redis::Counter.new(task) case name when "enqueue.active_job" then counter.increment unless retry?(job) when "perform.active_job" then succeed!(task, counter) if succeeded?(job, task) when "retry_stopped.active_job" then fail!(counter) end end |
.retry?(job) ⇒ Boolean
52 53 54 |
# File 'lib/wayfarer/batch_completion.rb', line 52 def retry?(job) job.executions > 0 end |
.subscribe! ⇒ Object
19 20 21 |
# File 'lib/wayfarer/batch_completion.rb', line 19 def subscribe! EVENTS.each { |event| ActiveSupport::Notifications.subscribe(event, self) } end |
.succeed!(task, counter) ⇒ Object
44 45 46 |
# File 'lib/wayfarer/batch_completion.rb', line 44 def succeed!(task, counter) Wayfarer::GC.run(task) if counter.decrement == 0 end |
.succeeded?(job, task) ⇒ Boolean
56 57 58 |
# File 'lib/wayfarer/batch_completion.rb', line 56 def succeeded?(job, task) job.exception_executions == task[:initial_exception_executions] end |