Class: Resque::Plugins::State::Hash

Inherits:
Hash
  • Object
show all
Defined in:
lib/resque/plugins/state/hash.rb

Overview

Resque::Plugins::State::Hash is a Hash object that has helper methods for dealing with the common status attributes. It also has a number of class methods for creating/updating/retrieving status objects from Redis

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Hash

Create a new Resque::Plugins::State::Hash object. If multiple arguments are passed it is assumed the first argument is the UUID and the rest are status objects. All arguments are subsequentily merged in order. Strings are assumed to be messages.



307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/resque/plugins/state/hash.rb', line 307

def initialize(*args)
  super nil
  base_status = {
    'time' => Time.now.to_i,
    'status' => Resque::Plugins::State::STATUS_QUEUED
  }
  base_status['uuid'] = args.shift if args.length > 1
  status_hash = args.inject(base_status) do |final, m|
    m = { 'message' => m } if m.is_a?(String)
    final.merge(m || {})
  end
  replace(status_hash)
end

Class Attribute Details

.expire_inObject

Returns the value of attribute expire_in.



234
235
236
# File 'lib/resque/plugins/state/hash.rb', line 234

def expire_in
  @expire_in
end

Class Method Details

.clear(range_start = nil, range_end = nil) ⇒ Object

clear statuses from redis passing an optional range. See ‘statuses` for info about ranges



51
52
53
54
55
# File 'lib/resque/plugins/state/hash.rb', line 51

def self.clear(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).each do |id|
    remove(id)
  end
end

.clear_completed(range_start = nil, range_end = nil) ⇒ Object



57
58
59
60
61
62
63
64
65
66
# File 'lib/resque/plugins/state/hash.rb', line 57

def self.clear_completed(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).select do |id|
    if get(id).completed?
      remove(id)
      true
    else
      false
    end
  end
end

.clear_failed(range_start = nil, range_end = nil) ⇒ Object



68
69
70
71
72
73
74
75
76
77
# File 'lib/resque/plugins/state/hash.rb', line 68

def self.clear_failed(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).select do |id|
    if get(id).failed?
      remove(id)
      true
    else
      false
    end
  end
end

.countObject



79
80
81
# File 'lib/resque/plugins/state/hash.rb', line 79

def self.count
  redis.zcard(set_key)
end

.create(uuid, *messages) ⇒ Object

Create a status, a new UUID, passing the message to the status Returns the UUID of the new status.



12
13
14
15
16
17
18
19
20
21
# File 'lib/resque/plugins/state/hash.rb', line 12

def self.create(uuid, *messages)
  set(uuid, *messages)
  redis.zadd(set_key, Time.now.to_i, uuid)
  if @expire_in
    redis.zremrangebyscore(
      set_key, 0, Time.now.to_i - @expire_in
    )
  end
  uuid
end

.generate_uuidObject



262
263
264
# File 'lib/resque/plugins/state/hash.rb', line 262

def self.generate_uuid
  SecureRandom.hex.to_s
end

.get(uuid) ⇒ Object

Get a status by UUID. Returns a Resque::Plugins::State::Hash



24
25
26
27
# File 'lib/resque/plugins/state/hash.rb', line 24

def self.get(uuid)
  val = redis.get(status_key(uuid))
  val ? Resque::Plugins::State::Hash.new(uuid, decode(val)) : nil
end

.hash_accessor(name, options = {}) ⇒ Object



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/resque/plugins/state/hash.rb', line 266

def self.hash_accessor(name, options = {})
  options[:default] ||= nil
  coerce = options[:coerce] ? ".#{options[:coerce]}" : ''
  module_eval <<-EOT
  def #{name}
    value = (self['#{name}'] ? self['#{name}']#{coerce} : #{options[:default].inspect})
    yield value if block_given?
    value
  end

  def #{name}=(value)
    self['#{name}'] = value
  end

  def #{name}?
    !!self['#{name}']
  end
  EOT
end

.kill(uuid) ⇒ Object

Kill the job at UUID on its next iteration this works by adding the UUID to a kill list (a.k.a. a list of jobs to be killed. Each iteration the job checks if it should be killed by calling tick or at. If so, it raises a Resque::Plugins::State::Killed error and sets the status to ‘killed’.



120
121
122
# File 'lib/resque/plugins/state/hash.rb', line 120

def self.kill(uuid)
  redis.sadd(kill_key, uuid)
end

.kill_idsObject

Return the UUIDs of the jobs on the kill list



130
131
132
# File 'lib/resque/plugins/state/hash.rb', line 130

def self.kill_ids
  redis.smembers(kill_key)
end

.kill_keyObject



250
251
252
# File 'lib/resque/plugins/state/hash.rb', line 250

def self.kill_key
  '_kill'
end

.killall(range_start = nil, range_end = nil) ⇒ Object

Kills num jobs within range starting with the most recent first. By default kills all jobs. Note that the same conditions apply as kill, i.e. only jobs that check on each iteration by calling tick or at are eligible to killed.

Examples:

killing the last 20 submitted jobs

Resque::Plugins::State::Hash.killall(0, 20)

Parameters:

  • range_start (Numeric) (defaults to: nil)

    The optional starting range

  • range_end (Numeric) (defaults to: nil)

    The optional ending range



142
143
144
145
146
# File 'lib/resque/plugins/state/hash.rb', line 142

def self.killall(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).collect do |id|
    kill(id)
  end
end

.killed(uuid) ⇒ Object

Remove the job at UUID from the kill list



125
126
127
# File 'lib/resque/plugins/state/hash.rb', line 125

def self.killed(uuid)
  redis.srem(kill_key, uuid)
end

.lock(key, timeout = 3600) ⇒ Object

Set a lock key to allow jobs to run as singletons. Optional timeout in seconds



216
217
218
219
# File 'lib/resque/plugins/state/hash.rb', line 216

def self.lock(key, timeout = 3600)
  redis.setnx("_lock-#{key}", key)
  redis.expire("_lock-#{key}", timeout)
end

.locked?(key) ⇒ Boolean

Check whether a key on the wait list

Returns:

  • (Boolean)


227
228
229
# File 'lib/resque/plugins/state/hash.rb', line 227

def self.locked?(key)
  redis.sismember("_lock-#{key}", key)
end

.mget(uuids) ⇒ Object

Get statuses by UUID. Returns array of Resque::Plugins::State::Hash



30
31
32
33
34
35
36
37
38
# File 'lib/resque/plugins/state/hash.rb', line 30

def self.mget(uuids)
  return [] if uuids.empty?
  status_keys = uuids.map { |u| status_key(u) }
  vals = redis.mget(*status_keys)

  uuids.zip(vals).map do |uuid, val|
    val ? Resque::Plugins::State::Hash.new(uuid, decode(val)) : nil
  end
end

.no_revert(uuid) ⇒ Object

Remove the job at UUID from the revert list; needed to handle jobs that don’t support revert



200
201
202
# File 'lib/resque/plugins/state/hash.rb', line 200

def self.no_revert(uuid)
  redis.srem(revert_key, uuid)
end

.pause(uuid) ⇒ Object

pause the job at UUID on its next iteration this works by adding the UUID to a pause list (a.k.a. a list of jobs to be pauseed. Each iteration the job checks if it should be pauseed by calling tick or at. If so, it sleeps for 10 seconds before checking again if it should continue sleeping



157
158
159
# File 'lib/resque/plugins/state/hash.rb', line 157

def self.pause(uuid)
  redis.sadd(pause_key, uuid)
end

.pause_idsObject

Return the UUIDs of the jobs on the pause list



167
168
169
# File 'lib/resque/plugins/state/hash.rb', line 167

def self.pause_ids
  redis.smembers(pause_key)
end

.pause_keyObject



254
255
256
# File 'lib/resque/plugins/state/hash.rb', line 254

def self.pause_key
  '_pause'
end

.pauseall(range_start = nil, range_end = nil) ⇒ Object

pauses num jobs within range starting with the most recent first. By default pauses all jobs. Note that the same conditions apply as pause, i.e. only jobs that check on each iteration by calling tick or at are eligible to pauseed.

Examples:

pauseing the last 20 submitted jobs

Resque::Plugins::State::Hash.pauseall(0, 20)

Parameters:

  • range_start (Numeric) (defaults to: nil)

    The optional starting range

  • range_end (Numeric) (defaults to: nil)

    The optional ending range



179
180
181
182
183
# File 'lib/resque/plugins/state/hash.rb', line 179

def self.pauseall(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).collect do |id|
    pause(id)
  end
end

.remove(uuid) ⇒ Object



83
84
85
86
# File 'lib/resque/plugins/state/hash.rb', line 83

def self.remove(uuid)
  redis.del(status_key(uuid))
  redis.zrem(set_key, uuid)
end

.revert(uuid) ⇒ Object

revert the job at UUID on its next iteration this works by adding the UUID to a revert list (a.k.a. a list of jobs to be reverted. Each iteration the job checks if it should be reverted by calling tick or at. If so, it sleeps for 10 seconds before checking again if it should continue sleeping



194
195
196
# File 'lib/resque/plugins/state/hash.rb', line 194

def self.revert(uuid)
  redis.sadd(revert_key, uuid)
end

.revert_idsObject

Return the UUIDs of the jobs on the revert list



205
206
207
# File 'lib/resque/plugins/state/hash.rb', line 205

def self.revert_ids
  redis.smembers(revert_key)
end

.revert_keyObject



258
259
260
# File 'lib/resque/plugins/state/hash.rb', line 258

def self.revert_key
  '_revert'
end

.set(uuid, *messages) ⇒ Object

set a status by UUID. messages can be any number of strings or hashes that are merged in order to create a single status.



42
43
44
45
46
47
# File 'lib/resque/plugins/state/hash.rb', line 42

def self.set(uuid, *messages)
  val = Resque::Plugins::State::Hash.new(uuid, *messages)
  redis.set(status_key(uuid), encode(val))
  redis.expire(status_key(uuid), expire_in) if expire_in
  val
end

.set_keyObject



246
247
248
# File 'lib/resque/plugins/state/hash.rb', line 246

def self.set_key
  '_statuses'
end

.should_kill?(uuid) ⇒ Boolean

Check whether a job with UUID is on the kill list

Returns:

  • (Boolean)


149
150
151
# File 'lib/resque/plugins/state/hash.rb', line 149

def self.should_kill?(uuid)
  redis.sismember(kill_key, uuid)
end

.should_pause?(uuid) ⇒ Boolean

Check whether a job with UUID is on the pause list

Returns:

  • (Boolean)


186
187
188
# File 'lib/resque/plugins/state/hash.rb', line 186

def self.should_pause?(uuid)
  redis.sismember(pause_key, uuid)
end

.should_revert?(uuid) ⇒ Boolean

Check whether a job with UUID is on the revert list

Returns:

  • (Boolean)


210
211
212
# File 'lib/resque/plugins/state/hash.rb', line 210

def self.should_revert?(uuid)
  redis.sismember(revert_key, uuid)
end

.status_ids(range_start = nil, range_end = nil) ⇒ Object

Return the num most recent status/job UUIDs in reverse chronological order.



102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/resque/plugins/state/hash.rb', line 102

def self.status_ids(range_start = nil, range_end = nil)
  if range_end && range_start
    # Because we want a reverse chronological order, we need to get a
    # range starting by the higest negative number. The ordering is
    # transparent from the API user's perspective so we need to convert
    # the passed params
    (redis.zrevrange(set_key, range_start.abs, (range_end || 1).abs) || [])
  else
    # Because we want a reverse chronological order, we need to get a
    # range starting by the higest negative number.
    redis.zrevrange(set_key, 0, -1) || []
  end
end

.status_key(uuid) ⇒ Object



242
243
244
# File 'lib/resque/plugins/state/hash.rb', line 242

def self.status_key(uuid)
  "status:#{uuid}"
end

.statuses(range_start = nil, range_end = nil) ⇒ Object

Return num Resque::Plugins::State::Hash objects in reverse chronological order. By default returns the entire set.

Examples:

retuning the last 20 statuses

Resque::Plugins::State::Hash.statuses(0, 20)

Parameters:

  • range_start (Numeric) (defaults to: nil)

    The optional starting range

  • range_end (Numeric) (defaults to: nil)

    The optional ending range



95
96
97
98
# File 'lib/resque/plugins/state/hash.rb', line 95

def self.statuses(range_start = nil, range_end = nil)
  ids = status_ids(range_start, range_end)
  mget(ids).compact || []
end

.unlock(key) ⇒ Object

Remove a key from the lock list



222
223
224
# File 'lib/resque/plugins/state/hash.rb', line 222

def self.unlock(key)
  redis.srem("_lock-#{key}", key)
end

.unpause(uuid) ⇒ Object

Remove the job at UUID from the pause list



162
163
164
# File 'lib/resque/plugins/state/hash.rb', line 162

def self.unpause(uuid)
  redis.srem(pause_key, uuid)
end

Instance Method Details

#inspectObject



382
383
384
# File 'lib/resque/plugins/state/hash.rb', line 382

def inspect
  "#<Resque::Plugins::State::Hash #{super}>"
end

#jsonObject

Return a JSON representation of the current object.



376
377
378
379
380
# File 'lib/resque/plugins/state/hash.rb', line 376

def json
  h = dup
  h['pct_complete'] = pct_complete
  self.class.encode(h)
end

#killable?Boolean

Can the job be killed? failed, completed, reverted, and killed jobs can’t be killed, for obvious reasons

Returns:

  • (Boolean)


353
354
355
# File 'lib/resque/plugins/state/hash.rb', line 353

def killable?
  !failed? && !completed? && !killed? && !reverted?
end

#pausable?Boolean

Can the job be reverted? failed, completed, reverted, and killed jobs can’t be paused, for obvious reasons

Returns:

  • (Boolean)


359
360
361
# File 'lib/resque/plugins/state/hash.rb', line 359

def pausable?
  !failed? && !completed? && !killed? && !paused? && !reverted?
end

#pct_completeObject

calculate the % completion of the job based on status, num and total



323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/resque/plugins/state/hash.rb', line 323

def pct_complete
  if completed?
    100
  elsif queued?
    0
  elsif failed?
    0
  else
    t = if total.nil?
          1
        else total
        end
    (((num || 0).to_f / t.to_f) * 100).to_i
  end
end

#timeObject

Return the time of the status initialization. If set returns a Time object, otherwise returns nil



341
342
343
# File 'lib/resque/plugins/state/hash.rb', line 341

def time
  time? ? Time.at(self['time']) : nil
end

#to_json(*_args) ⇒ Object



370
371
372
# File 'lib/resque/plugins/state/hash.rb', line 370

def to_json(*_args)
  json
end