Class: GoodJob::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- GoodJob::Job
- Includes:
- Lockable
- Defined in:
- lib/good_job/job.rb,
lib/good_job/job.rb
Overview
ActiveRecord model that represents an ActiveJob
job. Parent class can be configured with GoodJob.active_record_parent_class
.
Constant Summary collapse
- PreviouslyPerformedError =
Raised if something attempts to execute a previously completed Job again.
Class.new(StandardError)
- DEFAULT_QUEUE_NAME =
ActiveJob jobs without a
queue_name
attribute are placed on this queue. 'default'
- DEFAULT_PRIORITY =
ActiveJob jobs without a
priority
attribute are given this priority. 0
Constants included from Lockable
Lockable::RecordAlreadyAdvisoryLockedError
Class Method Summary collapse
-
.display_all(after_scheduled_at: nil, after_id: nil) ⇒ ActiveRecord::Relation
Get Jobs in display order with optional keyset pagination.
-
.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ⇒ Job
Places an ActiveJob job on a queue by creating a new Job record.
-
.finished(timestamp = nil) ⇒ ActiveRecord::Relation
Get Jobs were completed before the given timestamp.
-
.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>
Fetches the scheduled execution time of the next eligible Job(s).
-
.only_scheduled ⇒ ActiveRecord::Relation
Get Jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).
-
.perform_with_advisory_lock ⇒ ExecutionResult?
Finds the next eligible Job, acquire an advisory lock related to it, and executes the job.
-
.priority_ordered ⇒ ActiveRecord::Relation
Order jobs by priority (highest priority first).
-
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
-
.queue_string(string) ⇒ ActiveRecord::Relation
Get Jobs on queues that match the given queue string.
-
.running ⇒ ActiveRecord::Relation
Get Jobs have errored that will not be retried further.
-
.schedule_ordered ⇒ ActiveRecord::Relation
Order jobs by scheduled (unscheduled or soonest first).
-
.unfinished ⇒ ActiveRecord::Relation
Get Jobs that have not yet been completed.
-
.with_job_class ⇒ ActiveRecord::Relation
Get Jobs with given class name.
Instance Method Summary collapse
-
#executable? ⇒ Boolean
Tests whether this job is safe to be executed by this thread.
-
#perform ⇒ ExecutionResult
Execute the ActiveJob job this Job represents.
Methods included from Lockable
#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #lockable_key, #owns_advisory_lock?, #with_advisory_lock
Class Method Details
.display_all(after_scheduled_at: nil, after_id: nil) ⇒ ActiveRecord::Relation
Get Jobs in display order with optional keyset pagination.
148 149 150 151 152 153 154 155 156 |
# File 'lib/good_job/job.rb', line 148 scope :display_all, (lambda do |after_scheduled_at: nil, after_id: nil| query = order(Arel.sql('COALESCE(scheduled_at, created_at) DESC, id DESC')) if after_scheduled_at.present? && after_id.present? query = query.where(Arel.sql('(COALESCE(scheduled_at, created_at), id) < (:after_scheduled_at, :after_id)'), after_scheduled_at: after_scheduled_at, after_id: after_id) elsif after_scheduled_at.present? query = query.where(Arel.sql('(COALESCE(scheduled_at, created_at)) < (:after_scheduled_at)'), after_scheduled_at: after_scheduled_at) end query end) |
.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ⇒ Job
Places an ActiveJob job on a queue by creating a new GoodJob::Job record.
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/good_job/job.rb', line 210 def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| good_job_args = { active_job_id: active_job.job_id, queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, priority: active_job.priority || DEFAULT_PRIORITY, serialized_params: active_job.serialize, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock, } good_job_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) if CurrentExecution.cron_key good_job_args[:cron_key] = CurrentExecution.cron_key elsif CurrentExecution.active_job_id == active_job.job_id good_job_args[:cron_key] = CurrentExecution.good_job.cron_key end good_job = GoodJob::Job.new(**good_job_args) instrument_payload[:good_job] = good_job good_job.save! active_job.provider_job_id = good_job.id CurrentExecution.good_job.retried_good_job_id = good_job.id if CurrentExecution.good_job && CurrentExecution.good_job.active_job_id == active_job.job_id good_job end end |
.finished(timestamp = nil) ⇒ ActiveRecord::Relation
Get Jobs were completed before the given timestamp. If no timestamp is provided, get all jobs that have been completed. By default, GoodJob deletes jobs after they are completed and this will find no jobs. However, if you have changed GoodJob.preserve_job_records, this may find completed Jobs.
98 |
# File 'lib/good_job/job.rb', line 98 scope :finished, ->( = nil) { ? where(arel_table['finished_at'].lteq()) : where.not(finished_at: nil) } |
.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>
Fetches the scheduled execution time of the next eligible Job(s).
186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/good_job/job.rb', line 186 def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) query = advisory_unlocked.unfinished.schedule_ordered after ||= Time.current after_query = query.where('scheduled_at > ?', after).or query.where(scheduled_at: nil).where('created_at > ?', after) after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { || .compact.first } if now_limit&.positive? now_query = query.where('scheduled_at < ?', Time.current).or query.where(scheduled_at: nil) now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { || .compact.first } end Array(now_at) + after_at end |
.only_scheduled ⇒ ActiveRecord::Relation
Get Jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).
74 |
# File 'lib/good_job/job.rb', line 74 scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(Time.current)).or(where(scheduled_at: nil)) } |
.perform_with_advisory_lock ⇒ ExecutionResult?
Finds the next eligible Job, acquire an advisory lock related to it, and executes the job.
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/good_job/job.rb', line 165 def self.perform_with_advisory_lock unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock(unlock_session: true) do |good_jobs| good_job = good_jobs.first break if good_job.blank? break :unlocked unless good_job&.executable? begin good_job.with_advisory_lock(key: "good_jobs-#{good_job.active_job_id}") do good_job.perform end rescue RecordAlreadyAdvisoryLockedError => e ExecutionResult.new(value: nil, handled_error: e) end end end |
.priority_ordered ⇒ ActiveRecord::Relation
Order jobs by priority (highest priority first).
80 |
# File 'lib/good_job/job.rb', line 80 scope :priority_ordered, -> { order('priority DESC NULLS LAST') } |
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/good_job/job.rb', line 36 def self.queue_parser(string) string = string.presence || '*' if string.first == '-' exclude_queues = true string = string[1..-1] end queues = string.split(',').map(&:strip) if queues.include?('*') { all: true } elsif exclude_queues { exclude: queues } else { include: queues } end end |
.queue_string(string) ⇒ ActiveRecord::Relation
Get Jobs on queues that match the given queue string.
128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/good_job/job.rb', line 128 scope :queue_string, (lambda do |string| parsed = queue_parser(string) if parsed[:all] all elsif parsed[:exclude] where.not(queue_name: parsed[:exclude]).or where(queue_name: nil) elsif parsed[:include] where(queue_name: parsed[:include]) end end) |
.running ⇒ ActiveRecord::Relation
Get Jobs have errored that will not be retried further
104 |
# File 'lib/good_job/job.rb', line 104 scope :running, -> { where.not(performed_at: nil).where(finished_at: nil) } |
.schedule_ordered ⇒ ActiveRecord::Relation
Order jobs by scheduled (unscheduled or soonest first).
86 |
# File 'lib/good_job/job.rb', line 86 scope :schedule_ordered, -> { order(Arel.sql('COALESCE(scheduled_at, created_at) ASC')) } |
.unfinished ⇒ ActiveRecord::Relation
Get Jobs that have not yet been completed.
67 |
# File 'lib/good_job/job.rb', line 67 scope :unfinished, -> { where(finished_at: nil) } |
.with_job_class ⇒ ActiveRecord::Relation
Get Jobs with given class name
61 |
# File 'lib/good_job/job.rb', line 61 scope :with_job_class, ->(job_class) { where("serialized_params->>'job_class' = ?", job_class) } |
Instance Method Details
#executable? ⇒ Boolean
Tests whether this job is safe to be executed by this thread.
272 273 274 |
# File 'lib/good_job/job.rb', line 272 def executable? self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id) end |
#perform ⇒ ExecutionResult
Execute the ActiveJob job this GoodJob::Job represents.
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/good_job/job.rb', line 247 def perform raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at self.performed_at = Time.current save! if GoodJob.preserve_job_records result = execute job_error = result.handled_error || result.unhandled_error self.error = "#{job_error.class}: #{job_error.}" if job_error if result.unhandled_error && GoodJob.retry_on_unhandled_error save! elsif GoodJob.preserve_job_records == true || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) self.finished_at = Time.current save! else destroy! end result end |