Class: GoodJob::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Lockable
Defined in:
lib/good_job/job.rb,
lib/good_job/job.rb

Overview

ActiveRecord model that represents an ActiveJob job. Parent class can be configured with GoodJob.active_record_parent_class.

Constant Summary collapse

PreviouslyPerformedError =

Raised if something attempts to execute a previously completed Job again.

Class.new(StandardError)
DEFAULT_QUEUE_NAME =

ActiveJob jobs without a queue_name attribute are placed on this queue.

'default'
DEFAULT_PRIORITY =

ActiveJob jobs without a priority attribute are given this priority.

0

Constants included from Lockable

Lockable::RecordAlreadyAdvisoryLockedError

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Lockable

#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #lockable_key, #owns_advisory_lock?, #with_advisory_lock

Class Method Details

.display_all(after_scheduled_at: nil, after_id: nil) ⇒ ActiveRecord::Relation

Get Jobs in display order with optional keyset pagination.

Parameters:

  • after_scheduled_at (DateTime, String, nil) (defaults to: nil)

    Display records scheduled after this time for keyset pagination

  • after_id (Numeric, String, nil) (defaults to: nil)

    Display records after this ID for keyset pagination

Returns:

  • (ActiveRecord::Relation)


148
149
150
151
152
153
154
155
156
# File 'lib/good_job/job.rb', line 148

scope :display_all, (lambda do |after_scheduled_at: nil, after_id: nil|
  query = order(Arel.sql('COALESCE(scheduled_at, created_at) DESC, id DESC'))
  if after_scheduled_at.present? && after_id.present?
    query = query.where(Arel.sql('(COALESCE(scheduled_at, created_at), id) < (:after_scheduled_at, :after_id)'), after_scheduled_at: after_scheduled_at, after_id: after_id)
  elsif after_scheduled_at.present?
    query = query.where(Arel.sql('(COALESCE(scheduled_at, created_at)) < (:after_scheduled_at)'), after_scheduled_at: after_scheduled_at)
  end
  query
end)

.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ⇒ Job

Places an ActiveJob job on a queue by creating a new GoodJob::Job record.

Parameters:

  • active_job (ActiveJob::Base)

    The job to enqueue.

  • scheduled_at (Float) (defaults to: nil)

    Epoch timestamp when the job should be executed.

  • create_with_advisory_lock (Boolean) (defaults to: false)

    Whether to establish a lock on the GoodJob::Job record after it is created.

Returns:

  • (Job)

    The new GoodJob::Job instance representing the queued ActiveJob job.



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/good_job/job.rb', line 210

def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
  ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
    good_job_args = {
      active_job_id: active_job.job_id,
      queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME,
      priority: active_job.priority || DEFAULT_PRIORITY,
      serialized_params: active_job.serialize,
      scheduled_at: scheduled_at,
      create_with_advisory_lock: create_with_advisory_lock,
    }

    good_job_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)

    if CurrentExecution.cron_key
      good_job_args[:cron_key] = CurrentExecution.cron_key
    elsif CurrentExecution.active_job_id == active_job.job_id
      good_job_args[:cron_key] = CurrentExecution.good_job.cron_key
    end

    good_job = GoodJob::Job.new(**good_job_args)

    instrument_payload[:good_job] = good_job

    good_job.save!
    active_job.provider_job_id = good_job.id

    CurrentExecution.good_job.retried_good_job_id = good_job.id if CurrentExecution.good_job && CurrentExecution.good_job.active_job_id == active_job.job_id

    good_job
  end
end

.finished(timestamp = nil) ⇒ ActiveRecord::Relation

Get Jobs were completed before the given timestamp. If no timestamp is provided, get all jobs that have been completed. By default, GoodJob deletes jobs after they are completed and this will find no jobs. However, if you have changed GoodJob.preserve_job_records, this may find completed Jobs.

Parameters:

  • timestamp (Float) (defaults to: nil)

    Get jobs that finished before this time (in epoch time).

Returns:

  • (ActiveRecord::Relation)


98
# File 'lib/good_job/job.rb', line 98

scope :finished, ->(timestamp = nil) { timestamp ? where(arel_table['finished_at'].lteq(timestamp)) : where.not(finished_at: nil) }

.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>

Fetches the scheduled execution time of the next eligible Job(s).

Parameters:

  • after (DateTime) (defaults to: nil)
  • limit (Integer) (defaults to: 100)
  • now_limit (Integer, nil) (defaults to: nil)

Returns:

  • (Array<DateTime>)


186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/good_job/job.rb', line 186

def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)
  query = advisory_unlocked.unfinished.schedule_ordered

  after ||= Time.current
  after_query = query.where('scheduled_at > ?', after).or query.where(scheduled_at: nil).where('created_at > ?', after)
  after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first }

  if now_limit&.positive?
    now_query = query.where('scheduled_at < ?', Time.current).or query.where(scheduled_at: nil)
    now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first }
  end

  Array(now_at) + after_at
end

.only_scheduledActiveRecord::Relation

Get Jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).

Returns:

  • (ActiveRecord::Relation)


74
# File 'lib/good_job/job.rb', line 74

scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(Time.current)).or(where(scheduled_at: nil)) }

.perform_with_advisory_lockExecutionResult?

Finds the next eligible Job, acquire an advisory lock related to it, and executes the job.

Returns:

  • (ExecutionResult, nil)

    If a job was executed, returns an array with the GoodJob::Job record, the return value for the job’s #perform method, and the exception the job raised, if any (if the job raised, then the second array entry will be nil). If there were no jobs to execute, returns nil.



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/good_job/job.rb', line 165

def self.perform_with_advisory_lock
  unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock(unlock_session: true) do |good_jobs|
    good_job = good_jobs.first
    break if good_job.blank?
    break :unlocked unless good_job&.executable?

    begin
      good_job.with_advisory_lock(key: "good_jobs-#{good_job.active_job_id}") do
        good_job.perform
      end
    rescue RecordAlreadyAdvisoryLockedError => e
      ExecutionResult.new(value: nil, handled_error: e)
    end
  end
end

.priority_orderedActiveRecord::Relation

Order jobs by priority (highest priority first).

Returns:

  • (ActiveRecord::Relation)


80
# File 'lib/good_job/job.rb', line 80

scope :priority_ordered, -> { order('priority DESC NULLS LAST') }

.queue_parser(string) ⇒ Hash

Parse a string representing a group of queues into a more readable data structure.

Examples:

GoodJob::Job.queue_parser('-queue1,queue2')
=> { exclude: [ 'queue1', 'queue2' ] }

Parameters:

  • string (String)

    Queue string

Returns:

  • (Hash)

    How to match a given queue. It can have the following keys and values:

    • { all: true } indicates that all queues match.

    • { exclude: Array<String> } indicates the listed queue names should not match.

    • { include: Array<String> } indicates the listed queue names should match.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/good_job/job.rb', line 36

def self.queue_parser(string)
  string = string.presence || '*'

  if string.first == '-'
    exclude_queues = true
    string = string[1..-1]
  end

  queues = string.split(',').map(&:strip)

  if queues.include?('*')
    { all: true }
  elsif exclude_queues
    { exclude: queues }
  else
    { include: queues }
  end
end

.queue_string(string) ⇒ ActiveRecord::Relation

Get Jobs on queues that match the given queue string.

Parameters:

  • string (String)

    A string expression describing what queues to select. See queue_parser or README for more details on the format of the string. Note this only handles individual semicolon-separated segments of that string format.

Returns:

  • (ActiveRecord::Relation)


128
129
130
131
132
133
134
135
136
137
138
# File 'lib/good_job/job.rb', line 128

scope :queue_string, (lambda do |string|
  parsed = queue_parser(string)

  if parsed[:all]
    all
  elsif parsed[:exclude]
    where.not(queue_name: parsed[:exclude]).or where(queue_name: nil)
  elsif parsed[:include]
    where(queue_name: parsed[:include])
  end
end)

.runningActiveRecord::Relation

Get Jobs have errored that will not be retried further

Returns:

  • (ActiveRecord::Relation)


104
# File 'lib/good_job/job.rb', line 104

scope :running, -> { where.not(performed_at: nil).where(finished_at: nil) }

.schedule_orderedActiveRecord::Relation

Order jobs by scheduled (unscheduled or soonest first).

Returns:

  • (ActiveRecord::Relation)


86
# File 'lib/good_job/job.rb', line 86

scope :schedule_ordered, -> { order(Arel.sql('COALESCE(scheduled_at, created_at) ASC')) }

.unfinishedActiveRecord::Relation

Get Jobs that have not yet been completed.

Returns:

  • (ActiveRecord::Relation)


67
# File 'lib/good_job/job.rb', line 67

scope :unfinished, -> { where(finished_at: nil) }

.with_job_classActiveRecord::Relation

Get Jobs with given class name

Parameters:

  • string (String)

    Job class name

Returns:

  • (ActiveRecord::Relation)


61
# File 'lib/good_job/job.rb', line 61

scope :with_job_class, ->(job_class) { where("serialized_params->>'job_class' = ?", job_class) }

Instance Method Details

#executable?Boolean

Tests whether this job is safe to be executed by this thread.

Returns:

  • (Boolean)


272
273
274
# File 'lib/good_job/job.rb', line 272

def executable?
  self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id)
end

#performExecutionResult

Execute the ActiveJob job this GoodJob::Job represents.

Returns:

  • (ExecutionResult)

    An array of the return value of the job’s #perform method and the exception raised by the job, if any. If the job completed successfully, the second array entry (the exception) will be nil and vice versa.

Raises:



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/good_job/job.rb', line 247

def perform
  raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at

  self.performed_at = Time.current
  save! if GoodJob.preserve_job_records

  result = execute

  job_error = result.handled_error || result.unhandled_error
  self.error = "#{job_error.class}: #{job_error.message}" if job_error

  if result.unhandled_error && GoodJob.retry_on_unhandled_error
    save!
  elsif GoodJob.preserve_job_records == true || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error)
    self.finished_at = Time.current
    save!
  else
    destroy!
  end

  result
end