Class: Delayed::Job

Inherits:
ActiveRecord::Base show all
Includes:
Backend::Base
Defined in:
app/models/delayed/job.rb

Constant Summary collapse

REENQUEUE_BUFFER =
30.seconds

Constants included from Backend::Base

Backend::Base::ParseObjectFromYaml

Instance Attribute Summary

Attributes included from Backend::Base

#error

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Backend::Base

#destroy_failed_jobs?, #fail!, #failed?, #hook, included, #invoke_job, #max_attempts, #max_run_time, #name, #payload_object, #payload_object=, #priority, #priority=, #reschedule_at, #unlock

Methods inherited from ActiveRecord::Base

#to_yaml_properties, yaml_new

Class Method Details

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



39
40
41
# File 'app/models/delayed/job.rb', line 39

def self.clear_locks!(worker_name)
  where(locked_by: worker_name).update_all(locked_by: nil, locked_at: nil)
end

.connection_configObject



202
203
204
# File 'app/models/delayed/job.rb', line 202

def self.connection_config
  connection_db_config.configuration_hash
end

.database_adapter_nameObject



197
198
199
# File 'app/models/delayed/job.rb', line 197

def self.database_adapter_name
  connection_config[:adapter]
end

.database_nameObject



193
194
195
# File 'app/models/delayed/job.rb', line 193

def self.database_name
  connection_config[:database]
end

.db_time_nowObject

Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.



163
164
165
166
167
168
169
170
171
# File 'app/models/delayed/job.rb', line 163

def self.db_time_now
  if Time.zone
    Time.zone.now
  elsif default_timezone == :utc
    Time.now.utc
  else
    Time.current
  end
end

.default_timezoneObject



174
175
176
# File 'app/models/delayed/job.rb', line 174

def self.default_timezone
  ActiveRecord.default_timezone
end

.ready_to_run(worker_name, max_run_time) ⇒ Object



29
30
31
32
33
34
35
36
# File 'app/models/delayed/job.rb', line 29

def self.ready_to_run(worker_name, max_run_time)
  where(
    "((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL",
    db_time_now,
    db_time_now - (max_run_time + REENQUEUE_BUFFER),
    worker_name,
  )
end

.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
# File 'app/models/delayed/job.rb', line 43

def self.reserve(worker, max_run_time = Worker.max_run_time)
  ready_scope =
    ready_to_run(worker.name, max_run_time)
      .min_priority(worker.min_priority)
      .max_priority(worker.max_priority)
      .for_queues(worker.queues)
      .by_priority

  ActiveSupport::Notifications.instrument('delayed.worker.reserve_jobs', worker_tags(worker)) do
    reserve_with_scope(ready_scope, worker, db_time_now)
  end
end

.reserve_with_scope(ready_scope, worker, now) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'app/models/delayed/job.rb', line 56

def self.reserve_with_scope(ready_scope, worker, now)
  case connection.adapter_name
    when "PostgreSQL", "PostGIS"
      reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
    when "MySQL", "Mysql2"
      reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
    when "MSSQL", "Teradata"
      reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
    # Fallback for unknown / other DBMS
    else
      reserve_with_scope_using_default_sql(ready_scope, worker, now)
  end
end

.reserve_with_scope_using_default_sql(ready_scope, worker, now) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'app/models/delayed/job.rb', line 70

def self.reserve_with_scope_using_default_sql(ready_scope, worker, now)
  # This is our old fashion, tried and true, but possibly slower lookup
  # Instead of reading the entire job record for our detect loop, we select only the id,
  # and only read the full job record after we've successfully locked the job.
  # This can have a noticable impact on large read_ahead configurations and large payload jobs.
  attrs = { locked_at: now, locked_by: worker.name }

  jobs = []
  ready_scope.limit(worker.read_ahead).select(:id).each do |job|
    break if jobs.count >= worker.max_claims
    next unless ready_scope.where(id: job.id).update_all(attrs) == 1

    jobs << job.reload
  end

  jobs
end

.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'app/models/delayed/job.rb', line 145

def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
  # The MSSQL driver doesn't generate a limit clause when update_all
  # is called directly
  subsubquery_sql = ready_scope.limit(1).to_sql
  # select("id") doesn't generate a subquery, so force a subquery
  subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x"
  quoted_table_name = connection.quote_table_name(table_name)
  sql = "UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})"
  count = connection.execute(sanitize_sql([sql, now, worker.name]))
  return [] if count.zero?

  # MSSQL JDBC doesn't support OUTPUT INSERTED.* for returning a result set, so query locked row
  where(locked_at: now, locked_by: worker.name, failed_at: nil)
end

.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'app/models/delayed/job.rb', line 109

def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
  # Removing the millisecond precision from now(time object)
  # MySQL 5.6.4 onwards millisecond precision exists, but the
  # datetime object created doesn't have precision, so discarded
  # while updating. But during the where clause, for mysql(>=5.6.4),
  # it queries with precision as well. So removing the precision
  now = now.change(usec: 0)
  # Despite MySQL's support of UPDATE...LIMIT, it has an optimizer bug
  # that results in filesorts rather than index scans, which is very
  # expensive with a large number of jobs in the table:
  # http://bugs.mysql.com/bug.php?id=74049
  # The PostgreSQL and MSSQL reserve strategies, while valid syntax in
  # MySQL, result in deadlocks so we use a SELECT then UPDATE strategy
  # that is more likely to false-negative when attempting to reserve
  # jobs in parallel but doesn't rely on subselects or transactions.

  # Also, we are fetching multiple candidate_jobs at a time to try to
  # avoid the situation where multiple workers try to grab the same
  # job at the same time. That previously had caused poor performance
  # since ready_scope.where(id: job.id) would return nothing even
  # though there was a large number of jobs on the queue.
  attrs = { locked_at: now, locked_by: worker.name }

  jobs = []
  ready_scope.limit(worker.read_ahead).each do |job|
    break if jobs.count >= worker.max_claims
    next unless ready_scope.where(id: job.id).update_all(attrs) == 1

    job.assign_attributes(attrs)
    job.send(:changes_applied)
    jobs << job
  end

  jobs
end

.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) ⇒ Object

rubocop:disable Metrics/AbcSize



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'app/models/delayed/job.rb', line 88

def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) # rubocop:disable Metrics/AbcSize
  # Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT
  # This locks the single record 'FOR UPDATE' in the subquery
  # http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE
  # Note: active_record would attempt to generate UPDATE...LIMIT like
  # SQL for Postgres if we use a .limit() filter, but it would not
  # use 'FOR UPDATE' and we would have many locking conflicts
  table = connection.quote_table_name(table_name)

  # Rather than relying on a primary key, we use "WHERE ctid =", resulting in a fast 'Tid Scan'.
  if worker.max_claims > 1
    subquery = ready_scope.limit(worker.max_claims).lock("FOR UPDATE SKIP LOCKED").select("ctid").to_sql
    sql = "UPDATE #{table} SET locked_at = ?, locked_by = ? WHERE ctid = ANY (ARRAY (#{subquery})) RETURNING *"
  else
    subquery = ready_scope.limit(1).lock("FOR UPDATE SKIP LOCKED").select("ctid").to_sql
    sql = "UPDATE #{table} SET locked_at = ?, locked_by = ? WHERE ctid = (#{subquery}) RETURNING *"
  end

  find_by_sql([sql, now, worker.name]).sort_by(&:priority)
end

.set_delayed_job_table_nameObject



22
23
24
25
# File 'app/models/delayed/job.rb', line 22

def self.set_delayed_job_table_name
  delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs"
  self.table_name = delayed_job_table_name
end

.worker_tags(worker) ⇒ Object



179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'app/models/delayed/job.rb', line 179

def self.worker_tags(worker)
  {
    min_priority: worker.min_priority,
    max_priority: worker.max_priority,
    max_claims: worker.max_claims,
    read_ahead: worker.read_ahead,
    queues: worker.queues,
    table: table_name,
    database: database_name,
    database_adapter: database_adapter_name,
    worker: worker,
  }
end

Instance Method Details

#ageObject



236
237
238
# File 'app/models/delayed/job.rb', line 236

def age
  [(locked_at || self.class.db_time_now) - run_at, 0].max
end

#age_alert?Boolean

Returns:

  • (Boolean)


244
245
246
# File 'app/models/delayed/job.rb', line 244

def age_alert?
  alert_age&.<= age
end

#alert_ageObject



212
213
214
215
216
217
218
# File 'app/models/delayed/job.rb', line 212

def alert_age
  if payload_object.respond_to?(:alert_age)
    payload_object.alert_age
  else
    priority.alert_age
  end
end

#alert_attemptsObject



228
229
230
231
232
233
234
# File 'app/models/delayed/job.rb', line 228

def alert_attempts
  if payload_object.respond_to?(:alert_attempts)
    payload_object.alert_attempts
  else
    priority.alert_attempts
  end
end

#alert_run_timeObject



220
221
222
223
224
225
226
# File 'app/models/delayed/job.rb', line 220

def alert_run_time
  if payload_object.respond_to?(:alert_run_time)
    payload_object.alert_run_time
  else
    priority.alert_run_time
  end
end

#attempts_alert?Boolean

Returns:

  • (Boolean)


252
253
254
# File 'app/models/delayed/job.rb', line 252

def attempts_alert?
  alert_attempts&.<= attempts
end

#reload(*args) ⇒ Object



207
208
209
210
# File 'app/models/delayed/job.rb', line 207

def reload(*args)
  reset
  super
end

#run_timeObject



240
241
242
# File 'app/models/delayed/job.rb', line 240

def run_time
  self.class.db_time_now - locked_at if locked_at
end

#run_time_alert?Boolean

Returns:

  • (Boolean)


248
249
250
# File 'app/models/delayed/job.rb', line 248

def run_time_alert?
  alert_run_time&.<= run_time if run_time # locked_at may be `nil` if `delay_jobs` is false
end