Module: Resque::Scheduler::DelayingExtensions

Included in:
Extension
Defined in:
lib/resque/scheduler/delaying_extensions.rb

Instance Method Summary collapse

Instance Method Details

#clean_up_timestamp(key, timestamp) ⇒ Object



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/resque/scheduler/delaying_extensions.rb', line 300

def clean_up_timestamp(key, timestamp)
  # Use a watch here to ensure nobody adds jobs to this delayed
  # queue while we're removing it.
  redis.watch(key) do
    if redis.llen(key).to_i == 0
      # If the list is empty, remove it.
      redis.multi do |transaction|
        transaction.del(key)
        transaction.zrem(:delayed_queue_schedule, timestamp.to_i)
      end
    else
      redis.redis.unwatch
    end
  end
end

#count_all_scheduled_jobsObject



267
268
269
270
271
272
273
# File 'lib/resque/scheduler/delaying_extensions.rb', line 267

def count_all_scheduled_jobs
  total_jobs = 0
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |ts|
    total_jobs += redis.llen("delayed:#{ts}").to_i
  end
  total_jobs
end

#delay_or_enqueue_at(timestamp, klass, *args) ⇒ Object

Update the delayed timestamp of any matching delayed jobs or enqueue a new job if no matching jobs are found. Returns the number of delayed or enqueued jobs.



69
70
71
72
73
74
75
76
# File 'lib/resque/scheduler/delaying_extensions.rb', line 69

def delay_or_enqueue_at(timestamp, klass, *args)
  count = remove_delayed(klass, *args)
  count = 1 if count == 0

  count.times do
    enqueue_at(timestamp, klass, *args)
  end
end

#delay_or_enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to delay_or_enqueue_at, except it takes number_of_seconds_from_now instead of a timestamp



80
81
82
# File 'lib/resque/scheduler/delaying_extensions.rb', line 80

def delay_or_enqueue_in(number_of_seconds_from_now, klass, *args)
  delay_or_enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end

#delayed?(klass, *args) ⇒ Boolean

Discover if a job has been delayed. Examples

Resque.delayed?(MyJob)
Resque.delayed?(MyJob, id: 1)

Returns true if the job has been delayed

Returns:

  • (Boolean)


280
281
282
# File 'lib/resque/scheduler/delaying_extensions.rb', line 280

def delayed?(klass, *args)
  !scheduled_at(klass, *args).empty?
end

#delayed_push(timestamp, item) ⇒ Object

Used internally to stuff the item into the schedule sorted list. timestamp can be either in seconds or a datetime object. The insertion time complexity is O(log(n)). Returns true if it’s the first job to be scheduled at that time, else false.



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/resque/scheduler/delaying_extensions.rb', line 88

def delayed_push(timestamp, item)
  # First add this item to the list for this timestamp
  redis.rpush("delayed:#{timestamp.to_i}", encode(item))

  # Store the timestamps at with this item occurs
  redis.sadd("timestamps:#{encode(item)}", ["delayed:#{timestamp.to_i}"])

  # Now, add this timestamp to the zsets.  The score and the value are
  # the same since we'll be querying by timestamp, and we don't have
  # anything else to store.
  redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end

#delayed_queue_peek(start, count) ⇒ Object

Returns an array of timestamps based on start and count



102
103
104
105
106
# File 'lib/resque/scheduler/delaying_extensions.rb', line 102

def delayed_queue_peek(start, count)
  result = redis.zrange(:delayed_queue_schedule, start,
                        start + count - 1)
  Array(result).map(&:to_i)
end

#delayed_queue_schedule_sizeObject

Returns the size of the delayed queue schedule this does not represent the number of items in the queue to be scheduled



110
111
112
# File 'lib/resque/scheduler/delaying_extensions.rb', line 110

def delayed_queue_schedule_size
  redis.zcard :delayed_queue_schedule
end

#delayed_timestamp_peek(timestamp, start, count) ⇒ Object

Returns an array of delayed items for the given timestamp



121
122
123
124
125
126
127
128
# File 'lib/resque/scheduler/delaying_extensions.rb', line 121

def delayed_timestamp_peek(timestamp, start, count)
  if 1 == count
    r = list_range "delayed:#{timestamp.to_i}", start, count
    r.nil? ? [] : [r]
  else
    list_range "delayed:#{timestamp.to_i}", start, count
  end
end

#delayed_timestamp_size(timestamp) ⇒ Object

Returns the number of jobs for a given timestamp in the delayed queue schedule



116
117
118
# File 'lib/resque/scheduler/delaying_extensions.rb', line 116

def delayed_timestamp_size(timestamp)
  redis.llen("delayed:#{timestamp.to_i}").to_i
end

#enqueue_at(timestamp, klass, *args) ⇒ Object

This method is nearly identical to enqueue only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.



13
14
15
16
17
18
# File 'lib/resque/scheduler/delaying_extensions.rb', line 13

def enqueue_at(timestamp, klass, *args)
  validate(klass)
  enqueue_at_with_queue(
    queue_from_class(klass), timestamp, klass, *args
  )
end

#enqueue_at_with_queue(queue, timestamp, klass, *args) ⇒ Object

Identical to enqueue_at, except you can also specify a queue in which the job will be placed after the timestamp has passed. It respects Resque.inline option, by creating the job right away instead of adding to the queue.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/resque/scheduler/delaying_extensions.rb', line 24

def enqueue_at_with_queue(queue, timestamp, klass, *args)
  return false unless plugin.run_before_schedule_hooks(klass, *args)

  if Resque.inline? || timestamp.to_i <= Time.now.to_i
    # Just create the job and let resque perform it right away with
    # inline.  If the class is a custom job class, call self#scheduled
    # on it. This allows you to do things like
    # Resque.enqueue_at(timestamp, CustomJobClass, :opt1 => val1).
    # Otherwise, pass off to Resque.
    if klass.respond_to?(:scheduled)
      klass.scheduled(queue, klass.to_s, *args)
    else
      Resque.enqueue_to(queue, klass, *args)
    end
  else
    delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args))
  end

  plugin.run_after_schedule_hooks(klass, *args)
end

#enqueue_delayed(klass, *args) ⇒ Object

Given an encoded item, enqueue it now



174
175
176
177
178
179
# File 'lib/resque/scheduler/delaying_extensions.rb', line 174

def enqueue_delayed(klass, *args)
  hash = job_to_hash(klass, args)
  remove_delayed(klass, *args).times do
    Resque::Scheduler.enqueue_from_config(hash)
  end
end

#enqueue_delayed_selection(klass = nil) ⇒ Object

Given a block, enqueue jobs now that return true from a block

This allows for enqueuing of delayed jobs that have arguments matching certain criteria

Raises:

  • (ArgumentError)


205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/resque/scheduler/delaying_extensions.rb', line 205

def enqueue_delayed_selection(klass = nil)
  raise ArgumentError, 'Please supply a block' unless block_given?

  found_jobs = find_delayed_selection(klass) { |args| yield(args) }
  found_jobs.reduce(0) do |sum, encoded_job|
    decoded_job = decode(encoded_job)
    klass = Util.constantize(decoded_job['class'])
    queue = decoded_job['queue']

    if queue
      jobs_queued = enqueue_delayed_with_queue(klass, queue, *decoded_job['args'])
    else
      jobs_queued = enqueue_delayed(klass, *decoded_job['args'])
    end

    jobs_queued + sum
  end
end

#enqueue_delayed_with_queue(klass, queue, *args) ⇒ Object



181
182
183
184
185
186
# File 'lib/resque/scheduler/delaying_extensions.rb', line 181

def enqueue_delayed_with_queue(klass, queue, *args)
  hash = job_to_hash_with_queue(queue, klass, args)
  remove_delayed_in_queue(klass, queue, *args).times do
    Resque::Scheduler.enqueue_from_config(hash)
  end
end

#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.



47
48
49
50
51
52
# File 'lib/resque/scheduler/delaying_extensions.rb', line 47

def enqueue_in(number_of_seconds_from_now, klass, *args)
  unless number_of_seconds_from_now.is_a?(Numeric)
    raise ArgumentError, 'Please supply a numeric number of seconds'
  end
  enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end

#enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to enqueue_in, except you can also specify a queue in which the job will be placed after the number of seconds has passed.



57
58
59
60
61
62
63
64
# File 'lib/resque/scheduler/delaying_extensions.rb', line 57

def enqueue_in_with_queue(queue, number_of_seconds_from_now,
                          klass, *args)
  unless number_of_seconds_from_now.is_a?(Numeric)
    raise ArgumentError, 'Please supply a numeric number of seconds'
  end
  enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now,
                        klass, *args)
end

#find_delayed_selection(klass = nil, &block) ⇒ Object

Given a block, find jobs that return true from a block

This allows for finding of delayed jobs that have arguments matching certain criteria

Raises:

  • (ArgumentError)


228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/resque/scheduler/delaying_extensions.rb', line 228

def find_delayed_selection(klass = nil, &block)
  raise ArgumentError, 'Please supply a block' unless block_given?

  timestamps = redis.zrange(:delayed_queue_schedule, 0, -1)

  # Beyond 100 there's almost no improvement in speed
  found = timestamps.each_slice(100).map do |ts_group|
    jobs = redis.pipelined do |pipeline|
      ts_group.each do |ts|
        pipeline.lrange("delayed:#{ts}", 0, -1)
      end
    end

    jobs.flatten.select do |payload|
      payload_matches_selection?(decode(payload), klass, &block)
    end
  end

  found.flatten
end

#get_last_enqueued_at(job_name) ⇒ Object



296
297
298
# File 'lib/resque/scheduler/delaying_extensions.rb', line 296

def get_last_enqueued_at(job_name)
  redis.hget('delayed:last_enqueued_at', job_name)
end

#last_enqueued_at(job_name, date) ⇒ Object



292
293
294
# File 'lib/resque/scheduler/delaying_extensions.rb', line 292

def last_enqueued_at(job_name, date)
  redis.hset('delayed:last_enqueued_at', job_name, date)
end

#next_delayed_timestamp(at_time = nil) ⇒ Object

Returns the next delayed queue timestamp (don’t call directly)



132
133
134
# File 'lib/resque/scheduler/delaying_extensions.rb', line 132

def next_delayed_timestamp(at_time = nil)
  search_first_delayed_timestamp_in_range(nil, at_time || Time.now)
end

#next_item_for_timestamp(timestamp) ⇒ Object

Returns the next item to be processed for a given timestamp, nil if done. (don’t call directly) timestamp can either be in seconds or a datetime



139
140
141
142
143
144
145
146
147
148
149
# File 'lib/resque/scheduler/delaying_extensions.rb', line 139

def next_item_for_timestamp(timestamp)
  key = "delayed:#{timestamp.to_i}"

  encoded_item = redis.lpop(key)
  redis.srem("timestamps:#{encoded_item}", [key])
  item = decode(encoded_item)

  # If the list is empty, remove it.
  clean_up_timestamp(key, timestamp)
  item
end

#remove_delayed(klass, *args) ⇒ Object

Given an encoded item, remove it from the delayed_queue



163
164
165
166
# File 'lib/resque/scheduler/delaying_extensions.rb', line 163

def remove_delayed(klass, *args)
  search = encode(job_to_hash(klass, args))
  remove_delayed_job(search)
end

#remove_delayed_in_queue(klass, queue, *args) ⇒ Object



168
169
170
171
# File 'lib/resque/scheduler/delaying_extensions.rb', line 168

def remove_delayed_in_queue(klass, queue, *args)
  search = encode(job_to_hash_with_queue(queue, klass, args))
  remove_delayed_job(search)
end

#remove_delayed_job_from_timestamp(timestamp, klass, *args) ⇒ Object

Given a timestamp and job (klass + args) it removes all instances and returns the count of jobs removed.

O(N) where N is the number of jobs scheduled to fire at the given timestamp



254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/resque/scheduler/delaying_extensions.rb', line 254

def remove_delayed_job_from_timestamp(timestamp, klass, *args)
  return 0 if Resque.inline?

  key = "delayed:#{timestamp.to_i}"
  encoded_job = encode(job_to_hash(klass, args))

  redis.srem("timestamps:#{encoded_job}", [key])
  count = redis.lrem(key, 0, encoded_job)
  clean_up_timestamp(key, timestamp)

  count
end

#remove_delayed_selection(klass = nil) ⇒ Object

Given a block, remove jobs that return true from a block

This allows for removal of delayed jobs that have arguments matching certain criteria

Raises:

  • (ArgumentError)


192
193
194
195
196
197
198
199
# File 'lib/resque/scheduler/delaying_extensions.rb', line 192

def remove_delayed_selection(klass = nil)
  raise ArgumentError, 'Please supply a block' unless block_given?

  found_jobs = find_delayed_selection(klass) { |args| yield(args) }
  found_jobs.reduce(0) do |sum, encoded_job|
    sum + remove_delayed_job(encoded_job)
  end
end

#reset_delayed_queueObject

Clears all jobs created with enqueue_at or enqueue_in



152
153
154
155
156
157
158
159
160
# File 'lib/resque/scheduler/delaying_extensions.rb', line 152

def reset_delayed_queue
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item|
    key = "delayed:#{item}"
    items = redis.lrange(key, 0, -1)
    redis.del(key, items.map { |ts_item| "timestamps:#{ts_item}" })
  end

  redis.del :delayed_queue_schedule
end

#scheduled_at(klass, *args) ⇒ Object

Returns delayed jobs schedule timestamp for klass, args.



285
286
287
288
289
290
# File 'lib/resque/scheduler/delaying_extensions.rb', line 285

def scheduled_at(klass, *args)
  search = encode(job_to_hash(klass, args))
  redis.smembers("timestamps:#{search}").map do |key|
    key.tr('delayed:', '').to_i
  end
end