Module: Wayfarer::BatchCompletion

Defined in:
lib/wayfarer/batch_completion.rb

Overview

BatchCompletion tracks the completion of a batch of jobs. It does so by incrementing and decrementing a counter in Redis.

The counter is incremented when a job is first enqueued and decremented when a job is performed. If a job is retried, the counter is not incremented. When a job succeeds or fails and thereby exceeds its retry count, the counter is decremented.

When the counter reaches zero, garbage collection deletes the Redis keys associated with the batch.

Constant Summary collapse

EVENTS =
%w[enqueue.active_job perform.active_job retry_stopped.active_job].freeze

Class Method Summary collapse

Class Method Details

.call(name, _, _, _, data) ⇒ Object



23
24
25
26
27
28
29
30
31
32
# File 'lib/wayfarer/batch_completion.rb', line 23

def call(name, _, _, _, data)
  return unless (job = data[:job]).is_a?(Wayfarer::Base)

  task = job.arguments.first

  # In the case of `enqueue.active_job` middleware hasn't executed yet
  task[:redis_pool] ||= Wayfarer::Redis::Pool.instance

  handle(name, job, task)
end

.fail!(counter) ⇒ Object



48
49
50
# File 'lib/wayfarer/batch_completion.rb', line 48

def fail!(counter)
  counter.decrement
end

.handle(name, job, task) ⇒ Object



34
35
36
37
38
39
40
41
42
# File 'lib/wayfarer/batch_completion.rb', line 34

def handle(name, job, task)
  counter = Wayfarer::Redis::Counter.new(task)

  case name
  when "enqueue.active_job" then counter.increment unless retry?(job)
  when "perform.active_job" then succeed!(task, counter) if succeeded?(job, task)
  when "retry_stopped.active_job" then fail!(counter)
  end
end

.retry?(job) ⇒ Boolean



52
53
54
# File 'lib/wayfarer/batch_completion.rb', line 52

def retry?(job)
  job.executions > 0
end

.subscribe!Object



19
20
21
# File 'lib/wayfarer/batch_completion.rb', line 19

def subscribe!
  EVENTS.each { |event| ActiveSupport::Notifications.subscribe(event, self) }
end

.succeed!(task, counter) ⇒ Object



44
45
46
# File 'lib/wayfarer/batch_completion.rb', line 44

def succeed!(task, counter)
  Wayfarer::GC.run(task) if counter.decrement == 0
end

.succeeded?(job, task) ⇒ Boolean



56
57
58
# File 'lib/wayfarer/batch_completion.rb', line 56

def succeeded?(job, task)
  job.exception_executions == task[:initial_exception_executions]
end