Class: Delayed::JobGroups::JobGroup

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/delayed/job_groups/job_group.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.check_for_completion(job_group_id, skip_pending_jobs_check: false) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/delayed/job_groups/job_group.rb', line 73

def self.check_for_completion(job_group_id, skip_pending_jobs_check: false)
  # Optimization to avoid loading and locking the JobGroup when the group
  # still has pending jobs
  return if !skip_pending_jobs_check && has_pending_jobs?(job_group_id)

  transaction do
    # The first completed job to notice the job group's queue count has dropped to
    # zero will queue the job group's completion job and destroy the job group so
    # other jobs need to handle the job group having been destroyed already.
    job_group = where(id: job_group_id).lock(true).first
    job_group.send(:complete) if job_group&.send(:ready_for_completion?)
  end
end

.has_pending_jobs?(job_group_ids) ⇒ Boolean

rubocop:disable Naming/PredicateName

Returns:

  • (Boolean)


87
88
89
90
91
92
# File 'lib/delayed/job_groups/job_group.rb', line 87

def self.has_pending_jobs?(job_group_ids) # rubocop:disable Naming/PredicateName
  job_group_ids = Array(job_group_ids)
  return false if job_group_ids.empty?

  Delayed::Job.where(job_group_id: job_group_ids, failed_at: nil).exists?
end

Instance Method Details

#cancelObject



64
65
66
67
# File 'lib/delayed/job_groups/job_group.rb', line 64

def cancel
  Delayed::Job.enqueue(on_cancellation_job, on_cancellation_job_options || {}) if on_cancellation_job
  destroy
end

#check_for_completion(skip_pending_jobs_check: false) ⇒ Object



69
70
71
# File 'lib/delayed/job_groups/job_group.rb', line 69

def check_for_completion(skip_pending_jobs_check: false)
  self.class.check_for_completion(id, skip_pending_jobs_check: skip_pending_jobs_check)
end

#enqueue(job, options = {}) ⇒ Object



48
49
50
51
52
# File 'lib/delayed/job_groups/job_group.rb', line 48

def enqueue(job, options = {})
  options = options.merge(job_group_id: id)
  options[:blocked] = blocked?
  Delayed::Job.enqueue(job, options)
end

#mark_queueing_completeObject



39
40
41
42
43
44
45
46
# File 'lib/delayed/job_groups/job_group.rb', line 39

def mark_queueing_complete
  with_lock do
    raise 'JobGroup has already completed queueing' if queueing_complete?

    update_column(:queueing_complete, true)
    complete if ready_for_completion?
  end
end

#unblockObject



54
55
56
57
58
59
60
61
62
# File 'lib/delayed/job_groups/job_group.rb', line 54

def unblock
  return unless blocked?

  with_lock do
    update_column(:blocked, false)
    active_jobs.update_all(blocked: false, run_at: Delayed::Job.db_time_now)
    complete if ready_for_completion?
  end
end