Class: Delayed::Backend::Redis::Job
- Inherits:
-
Object
- Object
- Delayed::Backend::Redis::Job
show all
- Extended by:
- ActiveModel::Callbacks
- Includes:
- ActiveModel::Dirty, Base
- Defined in:
- lib/delayed/backend/redis/job.rb
Direct Known Subclasses
Failed
Defined Under Namespace
Modules: Keys
Classes: Failed
Constant Summary
collapse
- WAITING_STRAND_JOB_PRIORITY =
2000000
- COLUMNS =
[]
- TIMESTAMP_COLUMNS =
We store time attributes in redis as floats so we don’t have to do timestamp parsing in lua.
[]
- INTEGER_COLUMNS =
[]
Constants included
from Base
Base::ON_HOLD_COUNT, Base::ON_HOLD_LOCKED_BY
Instance Attribute Summary collapse
Class Method Summary
collapse
-
.bulk_update(action, opts) ⇒ Object
perform a bulk update of a set of jobs action is :hold, :unhold, or :destroy to specify the jobs to act on, either pass opts = [list of job ids] or opts = <some flavor> to perform on all jobs of that flavor.
-
.clear_locks!(worker_name) ⇒ Object
-
.column(name, type) ⇒ Object
-
.create(attrs = {}) ⇒ Object
-
.create!(attrs = {}) ⇒ Object
-
.create_singleton(options) ⇒ Object
-
.find(ids) ⇒ Object
-
.find_available(limit, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY) ⇒ Object
-
.find_one(id, options) ⇒ Object
-
.find_some(ids, options) ⇒ Object
-
.functions ⇒ Object
-
.get_and_lock_next_available(worker_name, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY, prefetch: nil, prefetch_owner: nil, forced_latency: nil) ⇒ Object
-
.get_with_ids(ids) ⇒ Object
-
.instantiate(attrs) ⇒ Object
-
.jobs_count(flavor, queue = Delayed::Settings.queue) ⇒ Object
get the total job count for the given flavor flavor is :current, :future or :failed for the :failed flavor, queue is currently ignored.
-
.key_for_job_id(job_id) ⇒ Object
-
.list_jobs(flavor, limit, offset = 0, query = nil) ⇒ Object
get a list of jobs of the given flavor in the given queue flavor is :current, :future, :failed, :strand or :tag depending on the flavor, query has a different meaning: for :current and :future, it’s the queue name (defaults to Delayed::Settings.queue) for :strand it’s the strand name for :tag it’s the tag name for :failed it’s ignored.
-
.reconnect! ⇒ Object
-
.running_jobs ⇒ Object
-
.strand_size(strand) ⇒ Object
-
.tag_counts(flavor, limit, offset = 0) ⇒ Object
returns a list of hashes { :tag => tag_name, :count => current_count } in descending count order flavor is :current or :all.
-
.unlock(jobs) ⇒ Object
Instance Method Summary
collapse
Methods included from Base
#batch?, #expired?, #failed?, #full_name, #hold!, included, #inferred_max_attempts, #initialize_defaults, #invoke_job, #locked?, #name, #on_hold?, #payload_object, #payload_object=, #permanent_failure, #reschedule, #reschedule_at, #unhold!, #unlock
Constructor Details
#initialize(attrs = {}) ⇒ Job
Returns a new instance of Job.
137
138
139
140
141
142
|
# File 'lib/delayed/backend/redis/job.rb', line 137
def initialize(attrs = {})
attrs.each { |k, v| self.send("#{k}=", v) }
self.priority ||= 0
self.attempts ||= 0
@new_record = true
end
|
Instance Attribute Details
#on_conflict ⇒ Object
not saved, just used as a marker when creating
368
369
370
|
# File 'lib/delayed/backend/redis/job.rb', line 368
def on_conflict
@on_conflict
end
|
#singleton ⇒ Object
not saved, just used as a marker when creating
368
369
370
|
# File 'lib/delayed/backend/redis/job.rb', line 368
def singleton
@singleton
end
|
Class Method Details
.bulk_update(action, opts) ⇒ Object
perform a bulk update of a set of jobs action is :hold, :unhold, or :destroy to specify the jobs to act on, either pass opts = [list of job ids] or opts = <some flavor> to perform on all jobs of that flavor
see the list_jobs action for the list of available flavors and the meaning of opts for each
351
352
353
354
355
356
|
# File 'lib/delayed/backend/redis/job.rb', line 351
def self.bulk_update(action, opts)
if %w(current future).include?(opts[:flavor].to_s)
opts[:query] ||= Delayed::Settings.queue
end
functions.bulk_update(action, opts[:ids], opts[:flavor], opts[:query], db_time_now)
end
|
.clear_locks!(worker_name) ⇒ Object
325
326
327
328
329
330
331
|
# File 'lib/delayed/backend/redis/job.rb', line 325
def self.clear_locks!(worker_name)
self.running_jobs.each do |job|
job.unlock! if job.locked_by == worker_name
end
nil
end
|
.column(name, type) ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/delayed/backend/redis/job.rb', line 99
def self.column(name, type)
COLUMNS << name
if type == :timestamp
TIMESTAMP_COLUMNS << name
elsif type == :integer
INTEGER_COLUMNS << name
end
attr_reader(name)
define_attribute_methods([name])
class_eval(<<-EOS, __FILE__, __LINE__ + 1)
def #{name}=(new_value)
#{name}_will_change! unless new_value == self.#{name}
@#{name} = new_value
end
EOS
end
|
.create(attrs = {}) ⇒ Object
150
151
152
153
154
|
# File 'lib/delayed/backend/redis/job.rb', line 150
def self.create(attrs = {})
result = new(attrs)
result.save
result
end
|
.create!(attrs = {}) ⇒ Object
156
157
158
159
160
|
# File 'lib/delayed/backend/redis/job.rb', line 156
def self.create!(attrs = {})
result = new(attrs)
result.save!
result
end
|
.create_singleton(options) ⇒ Object
358
359
360
|
# File 'lib/delayed/backend/redis/job.rb', line 358
def self.create_singleton(options)
self.create!(options.merge(:singleton => true))
end
|
.find(ids) ⇒ Object
170
171
172
173
174
175
176
|
# File 'lib/delayed/backend/redis/job.rb', line 170
def self.find(ids)
if Array === ids
find_some(ids, {})
else
find_one(ids, {})
end
end
|
.find_available(limit, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY) ⇒ Object
254
255
256
257
258
259
260
261
262
263
|
# File 'lib/delayed/backend/redis/job.rb', line 254
def self.find_available(limit,
queue = Delayed::Settings.queue,
min_priority = Delayed::MIN_PRIORITY,
max_priority = Delayed::MAX_PRIORITY)
check_queue(queue)
check_priorities(min_priority, max_priority)
self.find(functions.find_available(queue, limit, 0, min_priority, max_priority, db_time_now))
end
|
.find_one(id, options) ⇒ Object
206
207
208
209
|
# File 'lib/delayed/backend/redis/job.rb', line 206
def self.find_one(id, options)
job = self.get_with_ids([id]).first
job || raise(ActiveRecord::RecordNotFound, "Couldn't find Job with ID=#{id}")
end
|
.find_some(ids, options) ⇒ Object
211
212
213
|
# File 'lib/delayed/backend/redis/job.rb', line 211
def self.find_some(ids, options)
self.get_with_ids(ids).compact
end
|
.get_and_lock_next_available(worker_name, queue = Delayed::Settings.queue, min_priority = Delayed::MIN_PRIORITY, max_priority = Delayed::MAX_PRIORITY, prefetch: nil, prefetch_owner: nil, forced_latency: nil) ⇒ Object
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
|
# File 'lib/delayed/backend/redis/job.rb', line 223
def self.get_and_lock_next_available(worker_name,
queue = Delayed::Settings.queue,
min_priority = Delayed::MIN_PRIORITY,
max_priority = Delayed::MAX_PRIORITY,
prefetch: nil,
prefetch_owner: nil,
forced_latency: nil)
check_queue(queue)
check_priorities(min_priority, max_priority)
if worker_name.is_a?(Array)
multiple_workers = true
worker_name = worker_name.first
end
now = db_time_now
now -= forced_latency if forced_latency
job_attrs = functions.get_and_lock_next_available(worker_name, queue, min_priority, max_priority, now)
job = instantiate_from_attrs(job_attrs) if multiple_workers
if job.nil?
job = {}
else
job = { worker_name => job }
end
end
job
end
|
.get_with_ids(ids) ⇒ Object
215
216
217
|
# File 'lib/delayed/backend/redis/job.rb', line 215
def self.get_with_ids(ids)
ids.map { |id| self.instantiate_from_attrs(redis.hgetall(key_for_job_id(id))) }
end
|
.instantiate(attrs) ⇒ Object
144
145
146
147
148
|
# File 'lib/delayed/backend/redis/job.rb', line 144
def self.instantiate(attrs)
result = new(attrs)
result.instance_variable_set(:@new_record, false)
result
end
|
.jobs_count(flavor, queue = Delayed::Settings.queue) ⇒ Object
get the total job count for the given flavor flavor is :current, :future or :failed for the :failed flavor, queue is currently ignored
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
|
# File 'lib/delayed/backend/redis/job.rb', line 301
def self.jobs_count(flavor,
queue = Delayed::Settings.queue)
case flavor.to_s
when 'current'
check_queue(queue)
redis.zcard(Keys::QUEUE[queue])
when 'future'
check_queue(queue)
redis.zcard(Keys::FUTURE_QUEUE[queue])
when 'failed'
redis.zcard(Keys::FAILED_JOBS)
else
raise ArgumentError, "invalid flavor: #{flavor.inspect}"
end
end
|
.key_for_job_id(job_id) ⇒ Object
219
220
221
|
# File 'lib/delayed/backend/redis/job.rb', line 219
def self.key_for_job_id(job_id)
Keys::JOB[job_id]
end
|
.list_jobs(flavor, limit, offset = 0, query = nil) ⇒ Object
get a list of jobs of the given flavor in the given queue flavor is :current, :future, :failed, :strand or :tag depending on the flavor, query has a different meaning: for :current and :future, it’s the queue name (defaults to Delayed::Settings.queue) for :strand it’s the strand name for :tag it’s the tag name for :failed it’s ignored
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
|
# File 'lib/delayed/backend/redis/job.rb', line 272
def self.list_jobs(flavor,
limit,
offset = 0,
query = nil)
case flavor.to_s
when 'current'
query ||= Delayed::Settings.queue
check_queue(query)
self.find(functions.find_available(query, limit, offset, 0, "+inf", db_time_now))
when 'future'
query ||= Delayed::Settings.queue
check_queue(query)
self.find(redis.zrangebyscore(Keys::FUTURE_QUEUE[query], 0, "+inf", :limit => [offset, limit]))
when 'failed'
Failed.find(redis.zrevrangebyscore(Keys::FAILED_JOBS, "+inf", 0, :limit => [offset, limit]))
when 'strand'
self.find(redis.lrange(Keys::STRAND[query], offset, offset + limit - 1))
when 'tag'
ids = redis.smembers(Keys::TAG[query])
self.find(ids[offset, limit])
else
raise ArgumentError, "invalid flavor: #{flavor.inspect}"
end
end
|
.reconnect! ⇒ Object
194
195
196
197
198
199
200
|
# File 'lib/delayed/backend/redis/job.rb', line 194
def self.reconnect!
redis.respond_to?(:reconnect) ?
redis.reconnect :
redis.client.reconnect
end
|
.running_jobs ⇒ Object
321
322
323
|
# File 'lib/delayed/backend/redis/job.rb', line 321
def self.running_jobs()
self.find(redis.zrangebyscore(Keys::RUNNING_JOBS, 0, "+inf"))
end
|
.strand_size(strand) ⇒ Object
317
318
319
|
# File 'lib/delayed/backend/redis/job.rb', line 317
def self.strand_size(strand)
redis.llen(Keys::STRAND[strand])
end
|
.tag_counts(flavor, limit, offset = 0) ⇒ Object
returns a list of hashes { :tag => tag_name, :count => current_count } in descending count order flavor is :current or :all
336
337
338
339
340
341
342
|
# File 'lib/delayed/backend/redis/job.rb', line 336
def self.tag_counts(flavor,
limit,
offset = 0)
raise(ArgumentError, "invalid flavor: #{flavor.inspect}") unless %w(current all).include?(flavor.to_s)
key = Keys::TAG_COUNTS[flavor]
redis.zrevrangebyscore(key, '+inf', 1, :limit => [offset, limit], :withscores => true).map { |tag, count| { :tag => tag, :count => count } }
end
|
.unlock(jobs) ⇒ Object
362
363
364
365
|
# File 'lib/delayed/backend/redis/job.rb', line 362
def self.unlock(jobs)
jobs.each(&:unlock!)
jobs.length
end
|
Instance Method Details
#==(other) ⇒ Object
186
187
188
|
# File 'lib/delayed/backend/redis/job.rb', line 186
def ==(other)
other.is_a?(self.class) && id == other.id
end
|
#[](key) ⇒ Object
162
163
164
|
# File 'lib/delayed/backend/redis/job.rb', line 162
def [](key)
send(key)
end
|
#[]=(key, value) ⇒ Object
166
167
168
|
# File 'lib/delayed/backend/redis/job.rb', line 166
def []=(key, value)
send("#{key}=", value)
end
|
#changes_applied ⇒ Object
399
400
401
402
|
# File 'lib/delayed/backend/redis/job.rb', line 399
def changes_applied
@previously_changed = changes
@changed_attributes.clear
end
|
#create_and_lock!(worker_name) ⇒ Object
423
424
425
426
|
# File 'lib/delayed/backend/redis/job.rb', line 423
def create_and_lock!(worker_name)
raise "job already exists" unless new_record?
lock_in_redis!(worker_name)
end
|
#destroy ⇒ Object
409
410
411
412
413
|
# File 'lib/delayed/backend/redis/job.rb', line 409
def destroy
self.class.functions.destroy_job(id, self.class.db_time_now)
@destroyed = true
freeze
end
|
#destroyed? ⇒ Boolean
182
183
184
|
# File 'lib/delayed/backend/redis/job.rb', line 182
def destroyed?
!!@destroyed
end
|
#fail! ⇒ Object
428
429
430
431
432
433
434
|
# File 'lib/delayed/backend/redis/job.rb', line 428
def fail!
self.failed_at = self.class.db_time_now
save!
redis.rename Keys::JOB[id], Keys::FAILED_JOB[id]
tickle_strand
self
end
|
#hash ⇒ Object
190
191
192
|
# File 'lib/delayed/backend/redis/job.rb', line 190
def hash
id.hash
end
|
#lock_in_redis!(worker_name) ⇒ Object
374
375
376
377
378
|
# File 'lib/delayed/backend/redis/job.rb', line 374
def lock_in_redis!(worker_name)
self.locked_at = self.class.db_time_now
self.locked_by = worker_name
save
end
|
#new_record? ⇒ Boolean
178
179
180
|
# File 'lib/delayed/backend/redis/job.rb', line 178
def new_record?
!!@new_record
end
|
#save(*a) ⇒ Object
385
386
387
388
389
390
391
392
393
394
395
396
|
# File 'lib/delayed/backend/redis/job.rb', line 385
def save(*a)
return false if destroyed?
result = run_callbacks(:save) do
if new_record?
run_callbacks(:create) { create }
else
update
end
end
changes_applied
result
end
|
#save!(*a) ⇒ Object
405
406
407
|
# File 'lib/delayed/backend/redis/job.rb', line 405
def save!(*a)
save(*a) || raise(RecordNotSaved)
end
|
#tickle_strand ⇒ Object
take this job off the strand, and queue up the next strand job if this job was at the front
417
418
419
420
421
|
# File 'lib/delayed/backend/redis/job.rb', line 417
def tickle_strand
if strand.present?
self.class.functions.tickle_strand(id, strand, self.class.db_time_now)
end
end
|
#transfer_lock!(from:, to:) ⇒ Object
370
371
372
|
# File 'lib/delayed/backend/redis/job.rb', line 370
def transfer_lock!(from:, to:)
lock_in_redis!(to)
end
|
#unlock! ⇒ Object
380
381
382
383
|
# File 'lib/delayed/backend/redis/job.rb', line 380
def unlock!
unlock
save!
end
|