Class: Delayed::Backend::Redis::Job

Inherits:
Object
  • Object
show all
Extended by:
ActiveModel::Callbacks
Includes:
ActiveModel::Dirty, Base
Defined in:
lib/delayed/backend/redis/job.rb

Direct Known Subclasses

Failed

Defined Under Namespace

Modules: Keys Classes: Failed

Constant Summary collapse

WAITING_STRAND_JOB_PRIORITY =
2000000
COLUMNS =
[]
TIMESTAMP_COLUMNS =

We store time attributes in redis as floats so we don’t have to do timestamp parsing in lua.

[]
INTEGER_COLUMNS =
[]

Constants included from Base

Base::ON_HOLD_COUNT, Base::ON_HOLD_LOCKED_BY

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Base

#batch?, #expired?, #failed?, #full_name, #hold!, included, #inferred_max_attempts, #initialize_defaults, #invoke_job, #locked?, #name, #on_hold?, #payload_object, #payload_object=, #permanent_failure, #reschedule, #reschedule_at, #unhold!, #unlock

Constructor Details

#initialize(attrs = {}) ⇒ Job

Returns a new instance of Job.



137
138
139
140
141
142
# File 'lib/delayed/backend/redis/job.rb', line 137

def initialize(attrs = {})
  attrs.each { |k, v| self.send("#{k}=", v) }
  self.priority ||= 0
  self.attempts ||= 0
  @new_record = true
end

Instance Attribute Details

#on_conflictObject

not saved, just used as a marker when creating



368
369
370
# File 'lib/delayed/backend/redis/job.rb', line 368

def on_conflict
  @on_conflict
end

#singletonObject

not saved, just used as a marker when creating



368
369
370
# File 'lib/delayed/backend/redis/job.rb', line 368

def singleton
  @singleton
end

Class Method Details

.bulk_update(action, opts) ⇒ Object

perform a bulk update of a set of jobs action is :hold, :unhold, or :destroy to specify the jobs to act on, either pass opts = [list of job ids] or opts = <some flavor> to perform on all jobs of that flavor

see the list_jobs action for the list of available flavors and the meaning of opts for each



351
352
353
354
355
356
# File 'lib/delayed/backend/redis/job.rb', line 351

def self.bulk_update(action, opts)
  if %w(current future).include?(opts[:flavor].to_s)
    opts[:query] ||= Delayed::Settings.queue
  end
  functions.bulk_update(action, opts[:ids], opts[:flavor], opts[:query], db_time_now)
end

.clear_locks!(worker_name) ⇒ Object



325
326
327
328
329
330
331
# File 'lib/delayed/backend/redis/job.rb', line 325

def self.clear_locks!(worker_name)
  self.running_jobs.each do |job|
    # TODO: mark the job as failed one attempt
    job.unlock! if job.locked_by == worker_name
  end
  nil
end

.column(name, type) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/delayed/backend/redis/job.rb', line 99

def self.column(name, type)
  COLUMNS << name

  if type == :timestamp
    TIMESTAMP_COLUMNS << name
  elsif type == :integer
    INTEGER_COLUMNS << name
  end

  attr_reader(name)
  define_attribute_methods([name])
  # Custom attr_writer that updates the dirty status.
  class_eval(<<-EOS, __FILE__, __LINE__ + 1)
    def #{name}=(new_value)
      #{name}_will_change! unless new_value == self.#{name}
      @#{name} = new_value
    end
  EOS
end

.create(attrs = {}) ⇒ Object



150
151
152
153
154
# File 'lib/delayed/backend/redis/job.rb', line 150

def self.create(attrs = {})
  result = new(attrs)
  result.save
  result
end

.create!(attrs = {}) ⇒ Object



156
157
158
159
160
# File 'lib/delayed/backend/redis/job.rb', line 156

def self.create!(attrs = {})
  result = new(attrs)
  result.save!
  result
end

.create_singleton(options) ⇒ Object



358
359
360
# File 'lib/delayed/backend/redis/job.rb', line 358

def self.create_singleton(options)
  self.create!(options.merge(:singleton => true))
end

.find(ids) ⇒ Object



170
171
172
173
174
175
176
# File 'lib/delayed/backend/redis/job.rb', line 170

def self.find(ids)
  if Array === ids
    find_some(ids, {})
  else
    find_one(ids, {})
  end
end

.find_available(limit, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY) ⇒ Object



254
255
256
257
258
259
260
261
262
263
# File 'lib/delayed/backend/redis/job.rb', line 254

def self.find_available(limit,
    queue = Delayed::Settings.queue,
    min_priority = Delayed::MIN_PRIORITY,
    max_priority = Delayed::MAX_PRIORITY)

  check_queue(queue)
  check_priorities(min_priority, max_priority)

  self.find(functions.find_available(queue, limit, 0, min_priority, max_priority, db_time_now))
end

.find_one(id, options) ⇒ Object



206
207
208
209
# File 'lib/delayed/backend/redis/job.rb', line 206

def self.find_one(id, options)
  job = self.get_with_ids([id]).first
  job || raise(ActiveRecord::RecordNotFound, "Couldn't find Job with ID=#{id}")
end

.find_some(ids, options) ⇒ Object



211
212
213
# File 'lib/delayed/backend/redis/job.rb', line 211

def self.find_some(ids, options)
  self.get_with_ids(ids).compact
end

.functionsObject



202
203
204
# File 'lib/delayed/backend/redis/job.rb', line 202

def self.functions
  @@functions ||= Delayed::Backend::Redis::Functions.new(redis)
end

.get_and_lock_next_available(worker_name, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY, prefetch: nil, prefetch_owner: nil, forced_latency: nil) ⇒ Object



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/delayed/backend/redis/job.rb', line 223

def self.get_and_lock_next_available(worker_name,
    queue = Delayed::Settings.queue,
    min_priority = Delayed::MIN_PRIORITY,
    max_priority = Delayed::MAX_PRIORITY,
    prefetch: nil,
    prefetch_owner: nil,
    forced_latency: nil)

  check_queue(queue)
  check_priorities(min_priority, max_priority)
  if worker_name.is_a?(Array)
    multiple_workers = true
    worker_name = worker_name.first
  end

  # as an optimization this lua function returns the hash of job attributes,
  # rather than just a job id, saving a round trip
  now = db_time_now
  now -= forced_latency if forced_latency
  job_attrs = functions.get_and_lock_next_available(worker_name, queue, min_priority, max_priority, now)
  job = instantiate_from_attrs(job_attrs) # will return nil if the attrs are blank
  if multiple_workers
    if job.nil?
      job = {}
    else
      job = { worker_name => job }
    end
  end
  job
end

.get_with_ids(ids) ⇒ Object



215
216
217
# File 'lib/delayed/backend/redis/job.rb', line 215

def self.get_with_ids(ids)
  ids.map { |id| self.instantiate_from_attrs(redis.hgetall(key_for_job_id(id))) }
end

.instantiate(attrs) ⇒ Object



144
145
146
147
148
# File 'lib/delayed/backend/redis/job.rb', line 144

def self.instantiate(attrs)
  result = new(attrs)
  result.instance_variable_set(:@new_record, false)
  result
end

.jobs_count(flavor, queue = Delayed::Settings.queue) ⇒ Object

get the total job count for the given flavor flavor is :current, :future or :failed for the :failed flavor, queue is currently ignored



301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/delayed/backend/redis/job.rb', line 301

def self.jobs_count(flavor,
    queue = Delayed::Settings.queue)
  case flavor.to_s
    when 'current'
      check_queue(queue)
      redis.zcard(Keys::QUEUE[queue])
    when 'future'
      check_queue(queue)
      redis.zcard(Keys::FUTURE_QUEUE[queue])
    when 'failed'
      redis.zcard(Keys::FAILED_JOBS)
    else
      raise ArgumentError, "invalid flavor: #{flavor.inspect}"
  end
end

.key_for_job_id(job_id) ⇒ Object



219
220
221
# File 'lib/delayed/backend/redis/job.rb', line 219

def self.key_for_job_id(job_id)
  Keys::JOB[job_id]
end

.list_jobs(flavor, limit, offset = 0, query = nil) ⇒ Object

get a list of jobs of the given flavor in the given queue flavor is :current, :future, :failed, :strand or :tag depending on the flavor, query has a different meaning: for :current and :future, it’s the queue name (defaults to Delayed::Settings.queue) for :strand it’s the strand name for :tag it’s the tag name for :failed it’s ignored



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/delayed/backend/redis/job.rb', line 272

def self.list_jobs(flavor,
    limit,
    offset = 0,
    query = nil)
  case flavor.to_s
    when 'current'
      query ||= Delayed::Settings.queue
      check_queue(query)
      self.find(functions.find_available(query, limit, offset, 0, "+inf", db_time_now))
    when 'future'
      query ||= Delayed::Settings.queue
      check_queue(query)
      self.find(redis.zrangebyscore(Keys::FUTURE_QUEUE[query], 0, "+inf", :limit => [offset, limit]))
    when 'failed'
      Failed.find(redis.zrevrangebyscore(Keys::FAILED_JOBS, "+inf", 0, :limit => [offset, limit]))
    when 'strand'
      self.find(redis.lrange(Keys::STRAND[query], offset, offset + limit - 1))
    when 'tag'
      # This is optimized for writing, since list_jobs(:tag) will only ever happen in the admin UI
      ids = redis.smembers(Keys::TAG[query])
      self.find(ids[offset, limit])
    else
      raise ArgumentError, "invalid flavor: #{flavor.inspect}"
  end
end

.reconnect!Object



194
195
196
197
198
199
200
# File 'lib/delayed/backend/redis/job.rb', line 194

def self.reconnect!
  # redis cluster responds to reconnect directly,
  # but individual redis needs it to be called on client
  redis.respond_to?(:reconnect) ?
    redis.reconnect :
    redis.client.reconnect
end

.running_jobsObject



321
322
323
# File 'lib/delayed/backend/redis/job.rb', line 321

def self.running_jobs()
  self.find(redis.zrangebyscore(Keys::RUNNING_JOBS, 0, "+inf"))
end

.strand_size(strand) ⇒ Object



317
318
319
# File 'lib/delayed/backend/redis/job.rb', line 317

def self.strand_size(strand)
  redis.llen(Keys::STRAND[strand])
end

.tag_counts(flavor, limit, offset = 0) ⇒ Object

returns a list of hashes { :tag => tag_name, :count => current_count } in descending count order flavor is :current or :all

Raises:

  • (ArgumentError)


336
337
338
339
340
341
342
# File 'lib/delayed/backend/redis/job.rb', line 336

def self.tag_counts(flavor,
    limit,
    offset = 0)
  raise(ArgumentError, "invalid flavor: #{flavor.inspect}") unless %w(current all).include?(flavor.to_s)
  key = Keys::TAG_COUNTS[flavor]
  redis.zrevrangebyscore(key, '+inf', 1, :limit => [offset, limit], :withscores => true).map { |tag, count| { :tag => tag, :count => count } }
end

.unlock(jobs) ⇒ Object



362
363
364
365
# File 'lib/delayed/backend/redis/job.rb', line 362

def self.unlock(jobs)
  jobs.each(&:unlock!)
  jobs.length
end

Instance Method Details

#==(other) ⇒ Object



186
187
188
# File 'lib/delayed/backend/redis/job.rb', line 186

def ==(other)
  other.is_a?(self.class) && id == other.id
end

#[](key) ⇒ Object



162
163
164
# File 'lib/delayed/backend/redis/job.rb', line 162

def [](key)
  send(key)
end

#[]=(key, value) ⇒ Object



166
167
168
# File 'lib/delayed/backend/redis/job.rb', line 166

def []=(key, value)
  send("#{key}=", value)
end

#changes_appliedObject



399
400
401
402
# File 'lib/delayed/backend/redis/job.rb', line 399

def changes_applied
  @previously_changed = changes
  @changed_attributes.clear
end

#create_and_lock!(worker_name) ⇒ Object



423
424
425
426
# File 'lib/delayed/backend/redis/job.rb', line 423

def create_and_lock!(worker_name)
  raise "job already exists" unless new_record?
  lock_in_redis!(worker_name)
end

#destroyObject



409
410
411
412
413
# File 'lib/delayed/backend/redis/job.rb', line 409

def destroy
  self.class.functions.destroy_job(id, self.class.db_time_now)
  @destroyed = true
  freeze
end

#destroyed?Boolean

Returns:

  • (Boolean)


182
183
184
# File 'lib/delayed/backend/redis/job.rb', line 182

def destroyed?
  !!@destroyed
end

#fail!Object



428
429
430
431
432
433
434
# File 'lib/delayed/backend/redis/job.rb', line 428

def fail!
  self.failed_at = self.class.db_time_now
  save!
  redis.rename Keys::JOB[id], Keys::FAILED_JOB[id]
  tickle_strand
  self
end

#hashObject



190
191
192
# File 'lib/delayed/backend/redis/job.rb', line 190

def hash
  id.hash
end

#lock_in_redis!(worker_name) ⇒ Object



374
375
376
377
378
# File 'lib/delayed/backend/redis/job.rb', line 374

def lock_in_redis!(worker_name)
  self.locked_at = self.class.db_time_now
  self.locked_by = worker_name
  save
end

#new_record?Boolean

Returns:

  • (Boolean)


178
179
180
# File 'lib/delayed/backend/redis/job.rb', line 178

def new_record?
  !!@new_record
end

#save(*a) ⇒ Object



385
386
387
388
389
390
391
392
393
394
395
396
# File 'lib/delayed/backend/redis/job.rb', line 385

def save(*a)
  return false if destroyed?
  result = run_callbacks(:save) do
    if new_record?
      run_callbacks(:create) { create }
    else
      update
    end
  end
  changes_applied
  result
end

#save!(*a) ⇒ Object



405
406
407
# File 'lib/delayed/backend/redis/job.rb', line 405

def save!(*a)
  save(*a) || raise(RecordNotSaved)
end

#tickle_strandObject

take this job off the strand, and queue up the next strand job if this job was at the front



417
418
419
420
421
# File 'lib/delayed/backend/redis/job.rb', line 417

def tickle_strand
  if strand.present?
    self.class.functions.tickle_strand(id, strand, self.class.db_time_now)
  end
end

#transfer_lock!(from:, to:) ⇒ Object



370
371
372
# File 'lib/delayed/backend/redis/job.rb', line 370

def transfer_lock!(from:, to:)
  lock_in_redis!(to)
end

#unlock!Object



380
381
382
383
# File 'lib/delayed/backend/redis/job.rb', line 380

def unlock!
  unlock
  save!
end