Class: GoodJob::Batch
- Inherits:
-
Object
- Object
- GoodJob::Batch
- Includes:
- GlobalID::Identification
- Defined in:
- app/models/good_job/batch.rb
Overview
NOTE: This class delegates to BatchRecord and is intended to be the public interface for Batches.
Constant Summary collapse
- PROTECTED_PROPERTIES =
i[ on_finish on_success on_discard callback_queue_name callback_priority description properties ].freeze
Class Method Summary collapse
-
.enqueue(active_jobs = [], **properties, &block) ⇒ GoodJob::BatchRecord
Create a new batch and enqueue it.
- .find(id) ⇒ Object
- .primary_key ⇒ Object
-
.within_thread(batch_id: nil, batch_callback_id: nil) ⇒ Object
Helper method to enqueue jobs and assign them to a batch.
Instance Method Summary collapse
- #_record ⇒ Object
- #active_jobs ⇒ Object
-
#add(active_jobs = nil, &block) ⇒ Array<ActiveJob::Base>
Enqueue jobs and add them to the batch.
- #assign_properties(properties) ⇒ Object
- #callback_active_jobs ⇒ Object
-
#enqueue(active_jobs = [], **properties, &block) ⇒ Array<ActiveJob::Base>
Active jobs added to the batch.
-
#initialize(_record: nil, **properties) ⇒ Batch
constructor
rubocop:disable Lint/UnderscorePrefixedVariableName.
- #retry ⇒ Object
Constructor Details
#initialize(_record: nil, **properties) ⇒ Batch
rubocop:disable Lint/UnderscorePrefixedVariableName
86 87 88 89 |
# File 'app/models/good_job/batch.rb', line 86 def initialize(_record: nil, **properties) # rubocop:disable Lint/UnderscorePrefixedVariableName self.record = _record || BatchRecord.new assign_properties(properties) end |
Class Method Details
.enqueue(active_jobs = [], **properties, &block) ⇒ GoodJob::BatchRecord
Create a new batch and enqueue it
58 59 60 61 62 |
# File 'app/models/good_job/batch.rb', line 58 def self.enqueue(active_jobs = [], **properties, &block) new.tap do |batch| batch.enqueue(active_jobs, **properties, &block) end end |
.find(id) ⇒ Object
68 69 70 |
# File 'app/models/good_job/batch.rb', line 68 def self.find(id) new _record: BatchRecord.find(id) end |
.primary_key ⇒ Object
64 65 66 |
# File 'app/models/good_job/batch.rb', line 64 def self.primary_key :id end |
.within_thread(batch_id: nil, batch_callback_id: nil) ⇒ Object
Helper method to enqueue jobs and assign them to a batch
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'app/models/good_job/batch.rb', line 73 def self.within_thread(batch_id: nil, batch_callback_id: nil) original_batch_id = current_batch_id original_batch_callback_id = current_batch_callback_id self.current_batch_id = batch_id self.current_batch_callback_id = batch_callback_id yield ensure self.current_batch_id = original_batch_id self.current_batch_callback_id = original_batch_callback_id end |
Instance Method Details
#_record ⇒ Object
178 179 180 |
# File 'app/models/good_job/batch.rb', line 178 def _record record end |
#active_jobs ⇒ Object
163 164 165 |
# File 'app/models/good_job/batch.rb', line 163 def active_jobs record.jobs.map(&:active_job) end |
#add(active_jobs = nil, &block) ⇒ Array<ActiveJob::Base>
Enqueue jobs and add them to the batch
132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'app/models/good_job/batch.rb', line 132 def add(active_jobs = nil, &block) record.save if record.new_record? buffer = Bulk::Buffer.new buffer.add(active_jobs) buffer.capture(&block) if block self.class.within_thread(batch_id: id) do buffer.enqueue end buffer.active_jobs end |
#assign_properties(properties) ⇒ Object
171 172 173 174 175 176 |
# File 'app/models/good_job/batch.rb', line 171 def assign_properties(properties) properties = properties.dup batch_attrs = PROTECTED_PROPERTIES.index_with { |key| properties.delete(key) }.compact record.assign_attributes(batch_attrs) self.properties.merge!(properties) end |
#callback_active_jobs ⇒ Object
167 168 169 |
# File 'app/models/good_job/batch.rb', line 167 def callback_active_jobs record.callback_jobs.map(&:active_job) end |
#enqueue(active_jobs = [], **properties, &block) ⇒ Array<ActiveJob::Base>
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'app/models/good_job/batch.rb', line 92 def enqueue(active_jobs = [], **properties, &block) assign_properties(properties) if record.new_record? record.save! else record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do record.enqueued_at_will_change! record.jobs_finished_at_will_change! if GoodJob::BatchRecord.jobs_finished_at_migrated? record.finished_at_will_change! update_attributes = { discarded_at: nil, finished_at: nil } update_attributes[:jobs_finished_at] = nil if GoodJob::BatchRecord.jobs_finished_at_migrated? record.update!(**update_attributes) end end end active_jobs = add(active_jobs, &block) Rails.application.executor.wrap do buffer = GoodJob::Adapter::InlineBuffer.capture do record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do record.update!(enqueued_at: Time.current) # During inline execution, this could enqueue and execute further jobs record._continue_discard_or_finish(lock: false) end end end buffer.call end active_jobs end |
#retry ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'app/models/good_job/batch.rb', line 146 def retry Rails.application.executor.wrap do buffer = GoodJob::Adapter::InlineBuffer.capture do record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do update_attributes = { discarded_at: nil, finished_at: nil } update_attributes[:jobs_finished_at] = nil if GoodJob::BatchRecord.jobs_finished_at_migrated? record.update!(update_attributes) record.jobs.discarded.each(&:retry_job) record._continue_discard_or_finish(lock: false) end end end buffer.call end end |