Class: GoodJob::Batch

Inherits:
Object
  • Object
show all
Includes:
GlobalID::Identification
Defined in:
app/models/good_job/batch.rb

Overview

NOTE: This class delegates to BatchRecord and is intended to be the public interface for Batches.

Constant Summary collapse

PROTECTED_PROPERTIES =
i[
  on_finish
  on_success
  on_discard
  callback_queue_name
  callback_priority
  description
  properties
].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(_record: nil, **properties) ⇒ Batch

rubocop:disable Lint/UnderscorePrefixedVariableName



86
87
88
89
# File 'app/models/good_job/batch.rb', line 86

def initialize(_record: nil, **properties) # rubocop:disable Lint/UnderscorePrefixedVariableName
  self.record = _record || BatchRecord.new
  assign_properties(properties)
end

Class Method Details

.enqueue(active_jobs = [], **properties, &block) ⇒ GoodJob::BatchRecord

Create a new batch and enqueue it



58
59
60
61
62
# File 'app/models/good_job/batch.rb', line 58

def self.enqueue(active_jobs = [], **properties, &block)
  new.tap do |batch|
    batch.enqueue(active_jobs, **properties, &block)
  end
end

.find(id) ⇒ Object



68
69
70
# File 'app/models/good_job/batch.rb', line 68

def self.find(id)
  new _record: BatchRecord.find(id)
end

.primary_keyObject



64
65
66
# File 'app/models/good_job/batch.rb', line 64

def self.primary_key
  :id
end

.within_thread(batch_id: nil, batch_callback_id: nil) ⇒ Object

Helper method to enqueue jobs and assign them to a batch



73
74
75
76
77
78
79
80
81
82
83
84
# File 'app/models/good_job/batch.rb', line 73

def self.within_thread(batch_id: nil, batch_callback_id: nil)
  original_batch_id = current_batch_id
  original_batch_callback_id = current_batch_callback_id

  self.current_batch_id = batch_id
  self.current_batch_callback_id = batch_callback_id

  yield
ensure
  self.current_batch_id = original_batch_id
  self.current_batch_callback_id = original_batch_callback_id
end

Instance Method Details

#_recordObject



178
179
180
# File 'app/models/good_job/batch.rb', line 178

def _record
  record
end

#active_jobsObject



163
164
165
# File 'app/models/good_job/batch.rb', line 163

def active_jobs
  record.jobs.map(&:active_job)
end

#add(active_jobs = nil, &block) ⇒ Array<ActiveJob::Base>

Enqueue jobs and add them to the batch



132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'app/models/good_job/batch.rb', line 132

def add(active_jobs = nil, &block)
  record.save if record.new_record?

  buffer = Bulk::Buffer.new
  buffer.add(active_jobs)
  buffer.capture(&block) if block

  self.class.within_thread(batch_id: id) do
    buffer.enqueue
  end

  buffer.active_jobs
end

#assign_properties(properties) ⇒ Object



171
172
173
174
175
176
# File 'app/models/good_job/batch.rb', line 171

def assign_properties(properties)
  properties = properties.dup
  batch_attrs = PROTECTED_PROPERTIES.index_with { |key| properties.delete(key) }.compact
  record.assign_attributes(batch_attrs)
  self.properties.merge!(properties)
end

#callback_active_jobsObject



167
168
169
# File 'app/models/good_job/batch.rb', line 167

def callback_active_jobs
  record.callback_jobs.map(&:active_job)
end

#enqueue(active_jobs = [], **properties, &block) ⇒ Array<ActiveJob::Base>



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'app/models/good_job/batch.rb', line 92

def enqueue(active_jobs = [], **properties, &block)
  assign_properties(properties)
  if record.new_record?
    record.save!
  else
    record.transaction do
      record.with_advisory_lock(function: "pg_advisory_xact_lock") do
        record.enqueued_at_will_change!
        record.jobs_finished_at_will_change! if GoodJob::BatchRecord.jobs_finished_at_migrated?
        record.finished_at_will_change!

        update_attributes = { discarded_at: nil, finished_at: nil }
        update_attributes[:jobs_finished_at] = nil if GoodJob::BatchRecord.jobs_finished_at_migrated?
        record.update!(**update_attributes)
      end
    end
  end

  active_jobs = add(active_jobs, &block)

  Rails.application.executor.wrap do
    buffer = GoodJob::Adapter::InlineBuffer.capture do
      record.transaction do
        record.with_advisory_lock(function: "pg_advisory_xact_lock") do
          record.update!(enqueued_at: Time.current)

          # During inline execution, this could enqueue and execute further jobs
          record._continue_discard_or_finish(lock: false)
        end
      end
    end
    buffer.call
  end

  active_jobs
end

#retryObject



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'app/models/good_job/batch.rb', line 146

def retry
  Rails.application.executor.wrap do
    buffer = GoodJob::Adapter::InlineBuffer.capture do
      record.transaction do
        record.with_advisory_lock(function: "pg_advisory_xact_lock") do
          update_attributes = { discarded_at: nil, finished_at: nil }
          update_attributes[:jobs_finished_at] = nil if GoodJob::BatchRecord.jobs_finished_at_migrated?
          record.update!(update_attributes)
          record.jobs.discarded.each(&:retry_job)
          record._continue_discard_or_finish(lock: false)
        end
      end
    end
    buffer.call
  end
end