Class: Delayed::Backend::Redis::Job

Inherits:
Object
  • Object
show all
Extended by:
ActiveModel::Callbacks
Includes:
ActiveModel::Dirty, Base
Defined in:
lib/delayed/backend/redis/job.rb

Direct Known Subclasses

Failed

Defined Under Namespace

Modules: Keys Classes: Failed

Constant Summary collapse

WAITING_STRAND_JOB_PRIORITY =
2000000
COLUMNS =
[]
TIMESTAMP_COLUMNS =

We store time attributes in redis as floats so we don’t have to do timestamp parsing in lua.

[]
INTEGER_COLUMNS =
[]

Constants included from Base

Base::ON_HOLD_COUNT, Base::ON_HOLD_LOCKED_BY

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Base

#batch?, #expired?, #failed?, #full_name, #hold!, included, #initialize_defaults, #invoke_job, #locked?, #name, #on_hold?, #payload_object, #payload_object=, #permanent_failure, #reschedule, #reschedule_at, #unhold!, #unlock

Constructor Details

#initialize(attrs = {}) ⇒ Job

Returns a new instance of Job.



135
136
137
138
139
140
# File 'lib/delayed/backend/redis/job.rb', line 135

def initialize(attrs = {})
  attrs.each { |k, v| self.send("#{k}=", v) }
  self.priority ||= 0
  self.attempts ||= 0
  @new_record = true
end

Instance Attribute Details

#singletonObject

not saved, just used as a marker when creating



344
345
346
# File 'lib/delayed/backend/redis/job.rb', line 344

def singleton
  @singleton
end

Class Method Details

.bulk_update(action, opts) ⇒ Object

perform a bulk update of a set of jobs action is :hold, :unhold, or :destroy to specify the jobs to act on, either pass opts = [list of job ids] or opts = <some flavor> to perform on all jobs of that flavor

see the list_jobs action for the list of available flavors and the meaning of opts for each



332
333
334
335
336
337
# File 'lib/delayed/backend/redis/job.rb', line 332

def self.bulk_update(action, opts)
  if %w(current future).include?(opts[:flavor].to_s)
    opts[:query] ||= Delayed::Settings.queue
  end
  functions.bulk_update(action, opts[:ids], opts[:flavor], opts[:query], db_time_now)
end

.clear_locks!(worker_name) ⇒ Object



306
307
308
309
310
311
312
# File 'lib/delayed/backend/redis/job.rb', line 306

def self.clear_locks!(worker_name)
  self.running_jobs.each do |job|
    # TODO: mark the job as failed one attempt
    job.unlock! if job.locked_by == worker_name
  end
  nil
end

.column(name, type) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/delayed/backend/redis/job.rb', line 97

def self.column(name, type)
  COLUMNS << name

  if type == :timestamp
    TIMESTAMP_COLUMNS << name
  elsif type == :integer
    INTEGER_COLUMNS << name
  end

  attr_reader(name)
  define_attribute_methods([name])
  # Custom attr_writer that updates the dirty status.
  class_eval(<<-EOS, __FILE__, __LINE__ + 1)
    def #{name}=(new_value)
      #{name}_will_change! unless new_value == self.#{name}
      @#{name} = new_value
    end
  EOS
end

.create(attrs = {}) ⇒ Object



148
149
150
151
152
# File 'lib/delayed/backend/redis/job.rb', line 148

def self.create(attrs = {})
  result = new(attrs)
  result.save
  result
end

.create!(attrs = {}) ⇒ Object



154
155
156
157
158
# File 'lib/delayed/backend/redis/job.rb', line 154

def self.create!(attrs = {})
  result = new(attrs)
  result.save!
  result
end

.create_singleton(options) ⇒ Object



339
340
341
# File 'lib/delayed/backend/redis/job.rb', line 339

def self.create_singleton(options)
  self.create!(options.merge(:singleton => true))
end

.find(ids) ⇒ Object



168
169
170
171
172
173
174
# File 'lib/delayed/backend/redis/job.rb', line 168

def self.find(ids)
  if Array === ids
    find_some(ids, {})
  else
    find_one(ids, {})
  end
end

.find_available(limit, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY) ⇒ Object



235
236
237
238
239
240
241
242
243
244
# File 'lib/delayed/backend/redis/job.rb', line 235

def self.find_available(limit,
    queue = Delayed::Settings.queue,
    min_priority = Delayed::MIN_PRIORITY,
    max_priority = Delayed::MAX_PRIORITY)

  check_queue(queue)
  check_priorities(min_priority, max_priority)

  self.find(functions.find_available(queue, limit, 0, min_priority, max_priority, db_time_now))
end

.find_one(id, options) ⇒ Object



204
205
206
207
# File 'lib/delayed/backend/redis/job.rb', line 204

def self.find_one(id, options)
  job = self.get_with_ids([id]).first
  job || raise(ActiveRecord::RecordNotFound, "Couldn't find Job with ID=#{id}")
end

.find_some(ids, options) ⇒ Object



209
210
211
# File 'lib/delayed/backend/redis/job.rb', line 209

def self.find_some(ids, options)
  self.get_with_ids(ids).compact
end

.functionsObject



200
201
202
# File 'lib/delayed/backend/redis/job.rb', line 200

def self.functions
  @@functions ||= Delayed::Backend::Redis::Functions.new(redis)
end

.get_and_lock_next_available(worker_name, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/delayed/backend/redis/job.rb', line 221

def self.get_and_lock_next_available(worker_name,
    queue = Delayed::Settings.queue,
    min_priority = Delayed::MIN_PRIORITY,
    max_priority = Delayed::MAX_PRIORITY)

  check_queue(queue)
  check_priorities(min_priority, max_priority)

  # as an optimization this lua function returns the hash of job attributes,
  # rather than just a job id, saving a round trip
  job_attrs = functions.get_and_lock_next_available(worker_name, queue, min_priority, max_priority, db_time_now)
  instantiate_from_attrs(job_attrs) # will return nil if the attrs are blank
end

.get_with_ids(ids) ⇒ Object



213
214
215
# File 'lib/delayed/backend/redis/job.rb', line 213

def self.get_with_ids(ids)
  ids.map { |id| self.instantiate_from_attrs(redis.hgetall(key_for_job_id(id))) }
end

.instantiate(attrs) ⇒ Object



142
143
144
145
146
# File 'lib/delayed/backend/redis/job.rb', line 142

def self.instantiate(attrs)
  result = new(attrs)
  result.instance_variable_set(:@new_record, false)
  result
end

.jobs_count(flavor, queue = Delayed::Settings.queue) ⇒ Object

get the total job count for the given flavor flavor is :current, :future or :failed for the :failed flavor, queue is currently ignored



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/delayed/backend/redis/job.rb', line 282

def self.jobs_count(flavor,
    queue = Delayed::Settings.queue)
  case flavor.to_s
    when 'current'
      check_queue(queue)
      redis.zcard(Keys::QUEUE[queue])
    when 'future'
      check_queue(queue)
      redis.zcard(Keys::FUTURE_QUEUE[queue])
    when 'failed'
      redis.zcard(Keys::FAILED_JOBS)
    else
      raise ArgumentError, "invalid flavor: #{flavor.inspect}"
  end
end

.key_for_job_id(job_id) ⇒ Object



217
218
219
# File 'lib/delayed/backend/redis/job.rb', line 217

def self.key_for_job_id(job_id)
  Keys::JOB[job_id]
end

.list_jobs(flavor, limit, offset = 0, query = nil) ⇒ Object

get a list of jobs of the given flavor in the given queue flavor is :current, :future, :failed, :strand or :tag depending on the flavor, query has a different meaning: for :current and :future, it’s the queue name (defaults to Delayed::Settings.queue) for :strand it’s the strand name for :tag it’s the tag name for :failed it’s ignored



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/delayed/backend/redis/job.rb', line 253

def self.list_jobs(flavor,
    limit,
    offset = 0,
    query = nil)
  case flavor.to_s
    when 'current'
      query ||= Delayed::Settings.queue
      check_queue(query)
      self.find(functions.find_available(query, limit, offset, 0, "+inf", db_time_now))
    when 'future'
      query ||= Delayed::Settings.queue
      check_queue(query)
      self.find(redis.zrangebyscore(Keys::FUTURE_QUEUE[query], 0, "+inf", :limit => [offset, limit]))
    when 'failed'
      Failed.find(redis.zrevrangebyscore(Keys::FAILED_JOBS, "+inf", 0, :limit => [offset, limit]))
    when 'strand'
      self.find(redis.lrange(Keys::STRAND[query], offset, offset + limit - 1))
    when 'tag'
      # This is optimized for writing, since list_jobs(:tag) will only ever happen in the admin UI
      ids = redis.smembers(Keys::TAG[query])
      self.find(ids[offset, limit])
    else
      raise ArgumentError, "invalid flavor: #{flavor.inspect}"
  end
end

.reconnect!Object



192
193
194
195
196
197
198
# File 'lib/delayed/backend/redis/job.rb', line 192

def self.reconnect!
  # redis cluster responds to reconnect directly,
  # but individual redis needs it to be called on client
  redis.respond_to?(:reconnect) ?
    redis.reconnect :
    redis.client.reconnect
end

.running_jobsObject



302
303
304
# File 'lib/delayed/backend/redis/job.rb', line 302

def self.running_jobs()
  self.find(redis.zrangebyscore(Keys::RUNNING_JOBS, 0, "+inf"))
end

.strand_size(strand) ⇒ Object



298
299
300
# File 'lib/delayed/backend/redis/job.rb', line 298

def self.strand_size(strand)
  redis.llen(Keys::STRAND[strand])
end

.tag_counts(flavor, limit, offset = 0) ⇒ Object

returns a list of hashes { :tag => tag_name, :count => current_count } in descending count order flavor is :current or :all

Raises:

  • (ArgumentError)


317
318
319
320
321
322
323
# File 'lib/delayed/backend/redis/job.rb', line 317

def self.tag_counts(flavor,
    limit,
    offset = 0)
  raise(ArgumentError, "invalid flavor: #{flavor.inspect}") unless %w(current all).include?(flavor.to_s)
  key = Keys::TAG_COUNTS[flavor]
  redis.zrevrangebyscore(key, '+inf', 1, :limit => [offset, limit], :withscores => true).map { |tag, count| { :tag => tag, :count => count } }
end

Instance Method Details

#==(other) ⇒ Object



184
185
186
# File 'lib/delayed/backend/redis/job.rb', line 184

def ==(other)
  other.is_a?(self.class) && id == other.id
end

#[](key) ⇒ Object



160
161
162
# File 'lib/delayed/backend/redis/job.rb', line 160

def [](key)
  send(key)
end

#[]=(key, value) ⇒ Object



164
165
166
# File 'lib/delayed/backend/redis/job.rb', line 164

def []=(key, value)
  send("#{key}=", value)
end

#changes_appliedObject



372
373
374
375
# File 'lib/delayed/backend/redis/job.rb', line 372

def changes_applied
  @previously_changed = changes
  @changed_attributes.clear
end

#create_and_lock!(worker_name) ⇒ Object



396
397
398
399
# File 'lib/delayed/backend/redis/job.rb', line 396

def create_and_lock!(worker_name)
  raise "job already exists" unless new_record?
  lock_in_redis!(worker_name)
end

#destroyObject



382
383
384
385
386
# File 'lib/delayed/backend/redis/job.rb', line 382

def destroy
  self.class.functions.destroy_job(id, self.class.db_time_now)
  @destroyed = true
  freeze
end

#destroyed?Boolean

Returns:

  • (Boolean)


180
181
182
# File 'lib/delayed/backend/redis/job.rb', line 180

def destroyed?
  !!@destroyed
end

#fail!Object



401
402
403
404
405
406
407
# File 'lib/delayed/backend/redis/job.rb', line 401

def fail!
  self.failed_at = self.class.db_time_now
  save!
  redis.rename Keys::JOB[id], Keys::FAILED_JOB[id]
  tickle_strand
  self
end

#hashObject



188
189
190
# File 'lib/delayed/backend/redis/job.rb', line 188

def hash
  id.hash
end

#lock_in_redis!(worker_name) ⇒ Object



346
347
348
349
350
# File 'lib/delayed/backend/redis/job.rb', line 346

def lock_in_redis!(worker_name)
  self.locked_at = self.class.db_time_now
  self.locked_by = worker_name
  save
end

#new_record?Boolean

Returns:

  • (Boolean)


176
177
178
# File 'lib/delayed/backend/redis/job.rb', line 176

def new_record?
  !!@new_record
end

#save(*a) ⇒ Object



358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/delayed/backend/redis/job.rb', line 358

def save(*a)
  return false if destroyed?
  result = run_callbacks(:save) do
    if new_record?
      run_callbacks(:create) { create }
    else
      update
    end
  end
  changes_applied
  result
end

#save!(*a) ⇒ Object



378
379
380
# File 'lib/delayed/backend/redis/job.rb', line 378

def save!(*a)
  save(*a) || raise(RecordNotSaved)
end

#tickle_strandObject

take this job off the strand, and queue up the next strand job if this job was at the front



390
391
392
393
394
# File 'lib/delayed/backend/redis/job.rb', line 390

def tickle_strand
  if strand.present?
    self.class.functions.tickle_strand(id, strand, self.class.db_time_now)
  end
end

#unlock!Object



352
353
354
355
356
# File 'lib/delayed/backend/redis/job.rb', line 352

def unlock!
  self.locked_at = nil
  self.locked_by = nil
  save!
end