Class: Delayed::Backend::Redis::Job

Inherits:
Object
  • Object
show all
Extended by:
ActiveModel::Callbacks
Includes:
ActiveModel::Dirty, Base
Defined in:
lib/delayed/backend/redis/job.rb

Direct Known Subclasses

Failed

Defined Under Namespace

Modules: Keys Classes: Failed

Constant Summary collapse

WAITING_STRAND_JOB_PRIORITY =
2000000
COLUMNS =
[]
TIMESTAMP_COLUMNS =

We store time attributes in redis as floats so we don’t have to do timestamp parsing in lua.

[]
INTEGER_COLUMNS =
[]

Constants included from Base

Base::ON_HOLD_COUNT, Base::ON_HOLD_LOCKED_BY

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Base

#batch?, #expired?, #failed?, #full_name, #hold!, included, #initialize_defaults, #invoke_job, #locked?, #name, #on_hold?, #payload_object, #payload_object=, #permanent_failure, #reschedule, #reschedule_at, #unhold!, #unlock

Constructor Details

#initialize(attrs = {}) ⇒ Job

Returns a new instance of Job.



135
136
137
138
139
140
# File 'lib/delayed/backend/redis/job.rb', line 135

def initialize(attrs = {})
  attrs.each { |k, v| self.send("#{k}=", v) }
  self.priority ||= 0
  self.attempts ||= 0
  @new_record = true
end

Instance Attribute Details

#on_conflictObject

not saved, just used as a marker when creating



366
367
368
# File 'lib/delayed/backend/redis/job.rb', line 366

def on_conflict
  @on_conflict
end

#singletonObject

not saved, just used as a marker when creating



366
367
368
# File 'lib/delayed/backend/redis/job.rb', line 366

def singleton
  @singleton
end

Class Method Details

.bulk_update(action, opts) ⇒ Object

perform a bulk update of a set of jobs action is :hold, :unhold, or :destroy to specify the jobs to act on, either pass opts = [list of job ids] or opts = <some flavor> to perform on all jobs of that flavor

see the list_jobs action for the list of available flavors and the meaning of opts for each



349
350
351
352
353
354
# File 'lib/delayed/backend/redis/job.rb', line 349

def self.bulk_update(action, opts)
  if %w(current future).include?(opts[:flavor].to_s)
    opts[:query] ||= Delayed::Settings.queue
  end
  functions.bulk_update(action, opts[:ids], opts[:flavor], opts[:query], db_time_now)
end

.clear_locks!(worker_name) ⇒ Object



323
324
325
326
327
328
329
# File 'lib/delayed/backend/redis/job.rb', line 323

def self.clear_locks!(worker_name)
  self.running_jobs.each do |job|
    # TODO: mark the job as failed one attempt
    job.unlock! if job.locked_by == worker_name
  end
  nil
end

.column(name, type) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/delayed/backend/redis/job.rb', line 97

def self.column(name, type)
  COLUMNS << name

  if type == :timestamp
    TIMESTAMP_COLUMNS << name
  elsif type == :integer
    INTEGER_COLUMNS << name
  end

  attr_reader(name)
  define_attribute_methods([name])
  # Custom attr_writer that updates the dirty status.
  class_eval(<<-EOS, __FILE__, __LINE__ + 1)
    def #{name}=(new_value)
      #{name}_will_change! unless new_value == self.#{name}
      @#{name} = new_value
    end
  EOS
end

.create(attrs = {}) ⇒ Object



148
149
150
151
152
# File 'lib/delayed/backend/redis/job.rb', line 148

def self.create(attrs = {})
  result = new(attrs)
  result.save
  result
end

.create!(attrs = {}) ⇒ Object



154
155
156
157
158
# File 'lib/delayed/backend/redis/job.rb', line 154

def self.create!(attrs = {})
  result = new(attrs)
  result.save!
  result
end

.create_singleton(options) ⇒ Object



356
357
358
# File 'lib/delayed/backend/redis/job.rb', line 356

def self.create_singleton(options)
  self.create!(options.merge(:singleton => true))
end

.find(ids) ⇒ Object



168
169
170
171
172
173
174
# File 'lib/delayed/backend/redis/job.rb', line 168

def self.find(ids)
  if Array === ids
    find_some(ids, {})
  else
    find_one(ids, {})
  end
end

.find_available(limit, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY) ⇒ Object



252
253
254
255
256
257
258
259
260
261
# File 'lib/delayed/backend/redis/job.rb', line 252

def self.find_available(limit,
    queue = Delayed::Settings.queue,
    min_priority = Delayed::MIN_PRIORITY,
    max_priority = Delayed::MAX_PRIORITY)

  check_queue(queue)
  check_priorities(min_priority, max_priority)

  self.find(functions.find_available(queue, limit, 0, min_priority, max_priority, db_time_now))
end

.find_one(id, options) ⇒ Object



204
205
206
207
# File 'lib/delayed/backend/redis/job.rb', line 204

def self.find_one(id, options)
  job = self.get_with_ids([id]).first
  job || raise(ActiveRecord::RecordNotFound, "Couldn't find Job with ID=#{id}")
end

.find_some(ids, options) ⇒ Object



209
210
211
# File 'lib/delayed/backend/redis/job.rb', line 209

def self.find_some(ids, options)
  self.get_with_ids(ids).compact
end

.functionsObject



200
201
202
# File 'lib/delayed/backend/redis/job.rb', line 200

def self.functions
  @@functions ||= Delayed::Backend::Redis::Functions.new(redis)
end

.get_and_lock_next_available(worker_name, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY, prefetch: nil, prefetch_owner: nil, forced_latency: nil) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/delayed/backend/redis/job.rb', line 221

def self.get_and_lock_next_available(worker_name,
    queue = Delayed::Settings.queue,
    min_priority = Delayed::MIN_PRIORITY,
    max_priority = Delayed::MAX_PRIORITY,
    prefetch: nil,
    prefetch_owner: nil,
    forced_latency: nil)

  check_queue(queue)
  check_priorities(min_priority, max_priority)
  if worker_name.is_a?(Array)
    multiple_workers = true
    worker_name = worker_name.first
  end

  # as an optimization this lua function returns the hash of job attributes,
  # rather than just a job id, saving a round trip
  now = db_time_now
  now -= forced_latency if forced_latency
  job_attrs = functions.get_and_lock_next_available(worker_name, queue, min_priority, max_priority, now)
  job = instantiate_from_attrs(job_attrs) # will return nil if the attrs are blank
  if multiple_workers
    if job.nil?
      job = {}
    else
      job = { worker_name => job }
    end
  end
  job
end

.get_with_ids(ids) ⇒ Object



213
214
215
# File 'lib/delayed/backend/redis/job.rb', line 213

def self.get_with_ids(ids)
  ids.map { |id| self.instantiate_from_attrs(redis.hgetall(key_for_job_id(id))) }
end

.instantiate(attrs) ⇒ Object



142
143
144
145
146
# File 'lib/delayed/backend/redis/job.rb', line 142

def self.instantiate(attrs)
  result = new(attrs)
  result.instance_variable_set(:@new_record, false)
  result
end

.jobs_count(flavor, queue = Delayed::Settings.queue) ⇒ Object

get the total job count for the given flavor flavor is :current, :future or :failed for the :failed flavor, queue is currently ignored



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/delayed/backend/redis/job.rb', line 299

def self.jobs_count(flavor,
    queue = Delayed::Settings.queue)
  case flavor.to_s
    when 'current'
      check_queue(queue)
      redis.zcard(Keys::QUEUE[queue])
    when 'future'
      check_queue(queue)
      redis.zcard(Keys::FUTURE_QUEUE[queue])
    when 'failed'
      redis.zcard(Keys::FAILED_JOBS)
    else
      raise ArgumentError, "invalid flavor: #{flavor.inspect}"
  end
end

.key_for_job_id(job_id) ⇒ Object



217
218
219
# File 'lib/delayed/backend/redis/job.rb', line 217

def self.key_for_job_id(job_id)
  Keys::JOB[job_id]
end

.list_jobs(flavor, limit, offset = 0, query = nil) ⇒ Object

get a list of jobs of the given flavor in the given queue flavor is :current, :future, :failed, :strand or :tag depending on the flavor, query has a different meaning: for :current and :future, it’s the queue name (defaults to Delayed::Settings.queue) for :strand it’s the strand name for :tag it’s the tag name for :failed it’s ignored



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/delayed/backend/redis/job.rb', line 270

def self.list_jobs(flavor,
    limit,
    offset = 0,
    query = nil)
  case flavor.to_s
    when 'current'
      query ||= Delayed::Settings.queue
      check_queue(query)
      self.find(functions.find_available(query, limit, offset, 0, "+inf", db_time_now))
    when 'future'
      query ||= Delayed::Settings.queue
      check_queue(query)
      self.find(redis.zrangebyscore(Keys::FUTURE_QUEUE[query], 0, "+inf", :limit => [offset, limit]))
    when 'failed'
      Failed.find(redis.zrevrangebyscore(Keys::FAILED_JOBS, "+inf", 0, :limit => [offset, limit]))
    when 'strand'
      self.find(redis.lrange(Keys::STRAND[query], offset, offset + limit - 1))
    when 'tag'
      # This is optimized for writing, since list_jobs(:tag) will only ever happen in the admin UI
      ids = redis.smembers(Keys::TAG[query])
      self.find(ids[offset, limit])
    else
      raise ArgumentError, "invalid flavor: #{flavor.inspect}"
  end
end

.reconnect!Object



192
193
194
195
196
197
198
# File 'lib/delayed/backend/redis/job.rb', line 192

def self.reconnect!
  # redis cluster responds to reconnect directly,
  # but individual redis needs it to be called on client
  redis.respond_to?(:reconnect) ?
    redis.reconnect :
    redis.client.reconnect
end

.running_jobsObject



319
320
321
# File 'lib/delayed/backend/redis/job.rb', line 319

def self.running_jobs()
  self.find(redis.zrangebyscore(Keys::RUNNING_JOBS, 0, "+inf"))
end

.strand_size(strand) ⇒ Object



315
316
317
# File 'lib/delayed/backend/redis/job.rb', line 315

def self.strand_size(strand)
  redis.llen(Keys::STRAND[strand])
end

.tag_counts(flavor, limit, offset = 0) ⇒ Object

returns a list of hashes { :tag => tag_name, :count => current_count } in descending count order flavor is :current or :all

Raises:

  • (ArgumentError)


334
335
336
337
338
339
340
# File 'lib/delayed/backend/redis/job.rb', line 334

def self.tag_counts(flavor,
    limit,
    offset = 0)
  raise(ArgumentError, "invalid flavor: #{flavor.inspect}") unless %w(current all).include?(flavor.to_s)
  key = Keys::TAG_COUNTS[flavor]
  redis.zrevrangebyscore(key, '+inf', 1, :limit => [offset, limit], :withscores => true).map { |tag, count| { :tag => tag, :count => count } }
end

.unlock(jobs) ⇒ Object



360
361
362
363
# File 'lib/delayed/backend/redis/job.rb', line 360

def self.unlock(jobs)
  jobs.each(&:unlock!)
  jobs.length
end

Instance Method Details

#==(other) ⇒ Object



184
185
186
# File 'lib/delayed/backend/redis/job.rb', line 184

def ==(other)
  other.is_a?(self.class) && id == other.id
end

#[](key) ⇒ Object



160
161
162
# File 'lib/delayed/backend/redis/job.rb', line 160

def [](key)
  send(key)
end

#[]=(key, value) ⇒ Object



164
165
166
# File 'lib/delayed/backend/redis/job.rb', line 164

def []=(key, value)
  send("#{key}=", value)
end

#changes_appliedObject



397
398
399
400
# File 'lib/delayed/backend/redis/job.rb', line 397

def changes_applied
  @previously_changed = changes
  @changed_attributes.clear
end

#create_and_lock!(worker_name) ⇒ Object



421
422
423
424
# File 'lib/delayed/backend/redis/job.rb', line 421

def create_and_lock!(worker_name)
  raise "job already exists" unless new_record?
  lock_in_redis!(worker_name)
end

#destroyObject



407
408
409
410
411
# File 'lib/delayed/backend/redis/job.rb', line 407

def destroy
  self.class.functions.destroy_job(id, self.class.db_time_now)
  @destroyed = true
  freeze
end

#destroyed?Boolean

Returns:

  • (Boolean)


180
181
182
# File 'lib/delayed/backend/redis/job.rb', line 180

def destroyed?
  !!@destroyed
end

#fail!Object



426
427
428
429
430
431
432
# File 'lib/delayed/backend/redis/job.rb', line 426

def fail!
  self.failed_at = self.class.db_time_now
  save!
  redis.rename Keys::JOB[id], Keys::FAILED_JOB[id]
  tickle_strand
  self
end

#hashObject



188
189
190
# File 'lib/delayed/backend/redis/job.rb', line 188

def hash
  id.hash
end

#lock_in_redis!(worker_name) ⇒ Object



372
373
374
375
376
# File 'lib/delayed/backend/redis/job.rb', line 372

def lock_in_redis!(worker_name)
  self.locked_at = self.class.db_time_now
  self.locked_by = worker_name
  save
end

#new_record?Boolean

Returns:

  • (Boolean)


176
177
178
# File 'lib/delayed/backend/redis/job.rb', line 176

def new_record?
  !!@new_record
end

#save(*a) ⇒ Object



383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/delayed/backend/redis/job.rb', line 383

def save(*a)
  return false if destroyed?
  result = run_callbacks(:save) do
    if new_record?
      run_callbacks(:create) { create }
    else
      update
    end
  end
  changes_applied
  result
end

#save!(*a) ⇒ Object



403
404
405
# File 'lib/delayed/backend/redis/job.rb', line 403

def save!(*a)
  save(*a) || raise(RecordNotSaved)
end

#tickle_strandObject

take this job off the strand, and queue up the next strand job if this job was at the front



415
416
417
418
419
# File 'lib/delayed/backend/redis/job.rb', line 415

def tickle_strand
  if strand.present?
    self.class.functions.tickle_strand(id, strand, self.class.db_time_now)
  end
end

#transfer_lock!(from:, to:) ⇒ Object



368
369
370
# File 'lib/delayed/backend/redis/job.rb', line 368

def transfer_lock!(from:, to:)
  lock_in_redis!(to)
end

#unlock!Object



378
379
380
381
# File 'lib/delayed/backend/redis/job.rb', line 378

def unlock!
  unlock
  save!
end