Class: GoodJob::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Lockable
Defined in:
lib/good_job/job.rb

Overview

Represents a request to perform an ActiveJob job.

Constant Summary collapse

PreviouslyPerformedError =

Raised if something attempts to execute a previously completed Job again.

Class.new(StandardError)
DEFAULT_QUEUE_NAME =

ActiveJob jobs without a queue_name attribute are placed on this queue.

'default'.freeze
DEFAULT_PRIORITY =

ActiveJob jobs without a priority attribute are given this priority.

0

Constants included from Lockable

Lockable::RecordAlreadyAdvisoryLockedError

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Lockable

#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #owns_advisory_lock?, #with_advisory_lock

Class Method Details

.display_all(after_scheduled_at: nil, after_id: nil) ⇒ ActiveRecord::Relation

Get Jobs in display order with optional keyset pagination.

Parameters:

  • after_scheduled_at (DateTime, String, nil) (defaults to: nil)

    Display records scheduled after this time for keyset pagination

  • after_id (Numeric, String, nil) (defaults to: nil)

    Display records after this ID for keyset pagination

Returns:

  • (ActiveRecord::Relation)


117
118
119
120
121
122
123
124
125
# File 'lib/good_job/job.rb', line 117

scope :display_all, (lambda do |after_scheduled_at: nil, after_id: nil|
  query = order(Arel.sql('COALESCE(scheduled_at, created_at) DESC, id DESC'))
  if after_scheduled_at.present? && after_id.present?
    query = query.where(Arel.sql('(COALESCE(scheduled_at, created_at), id) < (:after_scheduled_at, :after_id)'), after_scheduled_at: after_scheduled_at, after_id: after_id)
  elsif after_scheduled_at.present?
    query = query.where(Arel.sql('(COALESCE(scheduled_at, created_at)) < (:after_scheduled_at)'), after_scheduled_at: after_scheduled_at)
  end
  query
end)

.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ⇒ Job

Places an ActiveJob job on a queue by creating a new GoodJob::Job record.

Parameters:

  • active_job (ActiveJob::Base)

    The job to enqueue.

  • scheduled_at (Float) (defaults to: nil)

    Epoch timestamp when the job should be executed.

  • create_with_advisory_lock (Boolean) (defaults to: false)

    Whether to establish a lock on the GoodJob::Job record after it is created.

Returns:

  • (Job)

    The new GoodJob::Job instance representing the queued ActiveJob job.



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/good_job/job.rb', line 159

def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
  good_job = nil
  ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
    good_job = GoodJob::Job.new(
      queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME,
      priority: active_job.priority || DEFAULT_PRIORITY,
      serialized_params: active_job.serialize,
      scheduled_at: scheduled_at,
      create_with_advisory_lock: create_with_advisory_lock
    )

    instrument_payload[:good_job] = good_job

    good_job.save!
    active_job.provider_job_id = good_job.id
  end

  good_job
end

.finished(timestamp = nil) ⇒ ActiveRecord::Relation

Get Jobs were completed before the given timestamp. If no timestamp is provided, get all jobs that have been completed. By default, GoodJob deletes jobs after they are completed and this will find no jobs. However, if you have changed GoodJob.preserve_job_records, this may find completed Jobs.

Parameters:

  • timestamp (Float) (defaults to: nil)

    Get jobs that finished before this time (in epoch time).

Returns:

  • (ActiveRecord::Relation)


85
# File 'lib/good_job/job.rb', line 85

scope :finished, ->(timestamp = nil) { timestamp ? where(arel_table['finished_at'].lteq(timestamp)) : where.not(finished_at: nil) }

.only_scheduledActiveRecord::Relation

Get Jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).

Returns:

  • (ActiveRecord::Relation)


67
# File 'lib/good_job/job.rb', line 67

scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(Time.current)).or(where(scheduled_at: nil)) }

.perform_with_advisory_lockArray<(GoodJob::Job, Object, Exception)>?

Finds the next eligible Job, acquire an advisory lock related to it, and executes the job.

Returns:

  • (Array<(GoodJob::Job, Object, Exception)>, nil)

    If a job was executed, returns an array with the GoodJob::Job record, the return value for the job’s #perform method, and the exception the job raised, if any (if the job raised, then the second array entry will be nil). If there were no jobs to execute, returns nil.



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/good_job/job.rb', line 134

def self.perform_with_advisory_lock
  good_job = nil
  result = nil
  error = nil

  unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs|
    good_job = good_jobs.first
    # TODO: Determine why some records are fetched without an advisory lock at all
    break unless good_job&.owns_advisory_lock?

    result, error = good_job.perform
  end

  [good_job, result, error] if good_job
end

.priority_orderedActiveRecord::Relation

Order jobs by priority (highest priority first).

Returns:

  • (ActiveRecord::Relation)


73
# File 'lib/good_job/job.rb', line 73

scope :priority_ordered, -> { order('priority DESC NULLS LAST') }

.queue_parser(string) ⇒ Hash

Parse a string representing a group of queues into a more readable data structure.

Examples:

GoodJob::Job.queue_parser('-queue1,queue2')
=> { exclude: [ 'queue1', 'queue2' ] }

Returns:

  • (Hash)

    How to match a given queue. It can have the following keys and values:

    • { all: true } indicates that all queues match.

    • { exclude: Array<String> } indicates the listed queue names should not match.

    • { include: Array<String> } indicates the listed queue names should match.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/good_job/job.rb', line 30

def self.queue_parser(string)
  string = string.presence || '*'

  if string.first == '-'
    exclude_queues = true
    string = string[1..-1]
  end

  queues = string.split(',').map(&:strip)

  if queues.include?('*')
    { all: true }
  elsif exclude_queues
    { exclude: queues }
  else
    { include: queues }
  end
end

.queue_string(string) ⇒ ActiveRecord::Relation

Get Jobs on queues that match the given queue string.

Parameters:

  • string (String)

    A string expression describing what queues to select. See queue_parser or README for more details on the format of the string. Note this only handles individual semicolon-separated segments of that string format.

Returns:

  • (ActiveRecord::Relation)


97
98
99
100
101
102
103
104
105
106
107
# File 'lib/good_job/job.rb', line 97

scope :queue_string, (lambda do |string|
  parsed = queue_parser(string)

  if parsed[:all]
    all
  elsif parsed[:exclude]
    where.not(queue_name: parsed[:exclude]).or where(queue_name: nil)
  elsif parsed[:include]
    where(queue_name: parsed[:include])
  end
end)

.unfinishedActiveRecord::Relation

Get Jobs that have not yet been completed.

Returns:

  • (ActiveRecord::Relation)


53
54
55
56
57
58
59
60
# File 'lib/good_job/job.rb', line 53

scope :unfinished, (lambda do
  if column_names.include?('finished_at')
    where(finished_at: nil)
  else
    ActiveSupport::Deprecation.warn('GoodJob expects a good_jobs.finished_at column to exist. Please see the GoodJob README.md for migration instructions.')
    nil
  end
end)

Instance Method Details

#performArray<(Object, Exception)>

Execute the ActiveJob job this GoodJob::Job represents.

Returns:

  • (Array<(Object, Exception)>)

    An array of the return value of the job’s #perform method and the exception raised by the job, if any. If the job completed successfully, the second array entry (the exception) will be nil and vice versa.

Raises:



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/good_job/job.rb', line 184

def perform
  raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at

  GoodJob::CurrentExecution.reset

  self.performed_at = Time.current
  save! if GoodJob.preserve_job_records

  result, unhandled_error = execute

  result_error = nil
  if result.is_a?(Exception)
    result_error = result
    result = nil
  end

  job_error = unhandled_error ||
              result_error ||
              GoodJob::CurrentExecution.error_on_retry ||
              GoodJob::CurrentExecution.error_on_discard

  self.error = "#{job_error.class}: #{job_error.message}" if job_error

  if unhandled_error && GoodJob.retry_on_unhandled_error
    save!
  elsif GoodJob.preserve_job_records == true || (unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error)
    self.finished_at = Time.current
    save!
  else
    destroy!
  end

  [result, job_error]
end