Class: Resque::Plugins::State::Hash

Inherits:
Hash
  • Object
show all
Defined in:
lib/resque/plugins/state/hash.rb

Overview

Resque::Plugins::State::Hash is a Hash object that has helper methods for dealing with the common status attributes. It also has a number of class methods for creating/updating/retrieving status objects from Redis

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Hash

Create a new Resque::Plugins::State::Hash object. If multiple arguments are passed it is assumed the first argument is the UUID and the rest are status objects. All arguments are subsequentily merged in order. Strings are assumed to be messages.



255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/resque/plugins/state/hash.rb', line 255

def initialize(*args)
  super nil
  base_status = {
    'time' => Time.now.to_i,
    'status' => Resque::Plugins::State::STATUS_QUEUED
  }
  base_status['uuid'] = args.shift if args.length > 1
  status_hash = args.inject(base_status) do |final, m|
    m = { 'message' => m } if m.is_a?(String)
    final.merge(m || {})
  end
  replace(status_hash)
end

Class Attribute Details

.expire_inObject

Returns the value of attribute expire_in.



186
187
188
# File 'lib/resque/plugins/state/hash.rb', line 186

def expire_in
  @expire_in
end

Class Method Details

.clear(range_start = nil, range_end = nil) ⇒ Object

clear statuses from redis passing an optional range. See ‘statuses` for info about ranges



47
48
49
50
51
# File 'lib/resque/plugins/state/hash.rb', line 47

def self.clear(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).each do |id|
    remove(id)
  end
end

.clear_completed(range_start = nil, range_end = nil) ⇒ Object



53
54
55
56
57
58
59
60
61
62
# File 'lib/resque/plugins/state/hash.rb', line 53

def self.clear_completed(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).select do |id|
    if get(id).completed?
      remove(id)
      true
    else
      false
    end
  end
end

.clear_failed(range_start = nil, range_end = nil) ⇒ Object



64
65
66
67
68
69
70
71
72
73
# File 'lib/resque/plugins/state/hash.rb', line 64

def self.clear_failed(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).select do |id|
    if get(id).failed?
      remove(id)
      true
    else
      false
    end
  end
end

.countObject



75
76
77
# File 'lib/resque/plugins/state/hash.rb', line 75

def self.count
  redis.zcard(set_key)
end

.create(uuid, *messages) ⇒ Object

Create a status, generating a new UUID, passing the message to the status Returns the UUID of the new status.



12
13
14
15
16
17
# File 'lib/resque/plugins/state/hash.rb', line 12

def self.create(uuid, *messages)
  set(uuid, *messages)
  redis.zadd(set_key, Time.now.to_i, uuid)
  redis.zremrangebyscore(set_key, 0, Time.now.to_i - @expire_in) if @expire_in
  uuid
end

.generate_uuidObject



210
211
212
# File 'lib/resque/plugins/state/hash.rb', line 210

def self.generate_uuid
  SecureRandom.hex.to_s
end

.get(uuid) ⇒ Object

Get a status by UUID. Returns a Resque::Plugins::State::Hash



20
21
22
23
# File 'lib/resque/plugins/state/hash.rb', line 20

def self.get(uuid)
  val = redis.get(status_key(uuid))
  val ? Resque::Plugins::State::Hash.new(uuid, decode(val)) : nil
end

.hash_accessor(name, options = {}) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/resque/plugins/state/hash.rb', line 214

def self.hash_accessor(name, options = {})
  options[:default] ||= nil
  coerce = options[:coerce] ? ".#{options[:coerce]}" : ''
  module_eval <<-EOT
  def #{name}
    value = (self['#{name}'] ? self['#{name}']#{coerce} : #{options[:default].inspect})
    yield value if block_given?
    value
  end

  def #{name}=(value)
    self['#{name}'] = value
  end

  def #{name}?
    !!self['#{name}']
  end
  EOT
end

.kill(uuid) ⇒ Object

Kill the job at UUID on its next iteration this works by adding the UUID to a kill list (a.k.a. a list of jobs to be killed. Each iteration the job checks if it should be killed by calling tick or at. If so, it raises a Resque::Plugins::State::Killed error and sets the status to ‘killed’.



113
114
115
# File 'lib/resque/plugins/state/hash.rb', line 113

def self.kill(uuid)
  redis.sadd(kill_key, uuid)
end

.kill_idsObject

Return the UUIDs of the jobs on the kill list



123
124
125
# File 'lib/resque/plugins/state/hash.rb', line 123

def self.kill_ids
  redis.smembers(kill_key)
end

.kill_keyObject



202
203
204
# File 'lib/resque/plugins/state/hash.rb', line 202

def self.kill_key
  '_kill'
end

.killall(range_start = nil, range_end = nil) ⇒ Object

Kills num jobs within range starting with the most recent first. By default kills all jobs. Note that the same conditions apply as kill, i.e. only jobs that check on each iteration by calling tick or at are eligible to killed.

Examples:

killing the last 20 submitted jobs

Resque::Plugins::State::Hash.killall(0, 20)

Parameters:

  • range_start (Numeric) (defaults to: nil)

    The optional starting range

  • range_end (Numeric) (defaults to: nil)

    The optional ending range



135
136
137
138
139
# File 'lib/resque/plugins/state/hash.rb', line 135

def self.killall(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).collect do |id|
    kill(id)
  end
end

.killed(uuid) ⇒ Object

Remove the job at UUID from the kill list



118
119
120
# File 'lib/resque/plugins/state/hash.rb', line 118

def self.killed(uuid)
  redis.srem(kill_key, uuid)
end

.mget(uuids) ⇒ Object

Get multiple statuses by UUID. Returns array of Resque::Plugins::State::Hash



26
27
28
29
30
31
32
33
34
# File 'lib/resque/plugins/state/hash.rb', line 26

def self.mget(uuids)
  return [] if uuids.empty?
  status_keys = uuids.map { |u| status_key(u) }
  vals = redis.mget(*status_keys)

  uuids.zip(vals).map do |uuid, val|
    val ? Resque::Plugins::State::Hash.new(uuid, decode(val)) : nil
  end
end

.pause(uuid) ⇒ Object

pause the job at UUID on its next iteration this works by adding the UUID to a pause list (a.k.a. a list of jobs to be pauseed. Each iteration the job checks if it should be pauseed by calling tick or at. If so, it sleeps for 10 seconds before checking again if it should continue sleeping



150
151
152
# File 'lib/resque/plugins/state/hash.rb', line 150

def self.pause(uuid)
  redis.sadd(pause_key, uuid)
end

.pause_idsObject

Return the UUIDs of the jobs on the pause list



160
161
162
# File 'lib/resque/plugins/state/hash.rb', line 160

def self.pause_ids
  redis.smembers(pause_key)
end

.pause_keyObject



206
207
208
# File 'lib/resque/plugins/state/hash.rb', line 206

def self.pause_key
  '_pause'
end

.pauseall(range_start = nil, range_end = nil) ⇒ Object

pauses num jobs within range starting with the most recent first. By default pauses all jobs. Note that the same conditions apply as pause, i.e. only jobs that check on each iteration by calling tick or at are eligible to pauseed.

Examples:

pauseing the last 20 submitted jobs

Resque::Plugins::State::Hash.pauseall(0, 20)

Parameters:

  • range_start (Numeric) (defaults to: nil)

    The optional starting range

  • range_end (Numeric) (defaults to: nil)

    The optional ending range



172
173
174
175
176
# File 'lib/resque/plugins/state/hash.rb', line 172

def self.pauseall(range_start = nil, range_end = nil)
  status_ids(range_start, range_end).collect do |id|
    pause(id)
  end
end

.remove(uuid) ⇒ Object



79
80
81
82
# File 'lib/resque/plugins/state/hash.rb', line 79

def self.remove(uuid)
  redis.del(status_key(uuid))
  redis.zrem(set_key, uuid)
end

.set(uuid, *messages) ⇒ Object

set a status by UUID. messages can be any number of strings or hashes that are merged in order to create a single status.



38
39
40
41
42
43
# File 'lib/resque/plugins/state/hash.rb', line 38

def self.set(uuid, *messages)
  val = Resque::Plugins::State::Hash.new(uuid, *messages)
  redis.set(status_key(uuid), encode(val))
  redis.expire(status_key(uuid), expire_in) if expire_in
  val
end

.set_keyObject



198
199
200
# File 'lib/resque/plugins/state/hash.rb', line 198

def self.set_key
  '_statuses'
end

.should_kill?(uuid) ⇒ Boolean

Check whether a job with UUID is on the kill list

Returns:

  • (Boolean)


142
143
144
# File 'lib/resque/plugins/state/hash.rb', line 142

def self.should_kill?(uuid)
  redis.sismember(kill_key, uuid)
end

.should_pause?(uuid) ⇒ Boolean

Check whether a job with UUID is on the pause list

Returns:

  • (Boolean)


179
180
181
# File 'lib/resque/plugins/state/hash.rb', line 179

def self.should_pause?(uuid)
  redis.sismember(pause_key, uuid)
end

.status_ids(range_start = nil, range_end = nil) ⇒ Object

Return the num most recent status/job UUIDs in reverse chronological order.



96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/resque/plugins/state/hash.rb', line 96

def self.status_ids(range_start = nil, range_end = nil)
  if range_end && range_start
    # Because we want a reverse chronological order, we need to get a range starting
    # by the higest negative number. The ordering is transparent from the API user's
    # perspective so we need to convert the passed params
    (redis.zrevrange(set_key, range_start.abs, (range_end || 1).abs) || [])
  else
    # Because we want a reverse chronological order, we need to get a range starting
    # by the higest negative number.
    redis.zrevrange(set_key, 0, -1) || []
  end
end

.status_key(uuid) ⇒ Object



194
195
196
# File 'lib/resque/plugins/state/hash.rb', line 194

def self.status_key(uuid)
  "status:#{uuid}"
end

.statuses(range_start = nil, range_end = nil) ⇒ Object

Return num Resque::Plugins::State::Hash objects in reverse chronological order. By default returns the entire set.

Examples:

retuning the last 20 statuses

Resque::Plugins::State::Hash.statuses(0, 20)

Parameters:

  • range_start (Numeric) (defaults to: nil)

    The optional starting range

  • range_end (Numeric) (defaults to: nil)

    The optional ending range



90
91
92
93
# File 'lib/resque/plugins/state/hash.rb', line 90

def self.statuses(range_start = nil, range_end = nil)
  ids = status_ids(range_start, range_end)
  mget(ids).compact || []
end

.unpause(uuid) ⇒ Object

Remove the job at UUID from the pause list



155
156
157
# File 'lib/resque/plugins/state/hash.rb', line 155

def self.unpause(uuid)
  redis.srem(pause_key, uuid)
end

Instance Method Details

#inspectObject



324
325
326
# File 'lib/resque/plugins/state/hash.rb', line 324

def inspect
  "#<Resque::Plugins::State::Hash #{super}>"
end

#jsonObject

Return a JSON representation of the current object.



318
319
320
321
322
# File 'lib/resque/plugins/state/hash.rb', line 318

def json
  h = dup
  h['pct_complete'] = pct_complete
  self.class.encode(h)
end

#killable?Boolean

Can the job be killed? failed, completed, and killed jobs can’t be killed, for obvious reasons

Returns:

  • (Boolean)


301
302
303
# File 'lib/resque/plugins/state/hash.rb', line 301

def killable?
  !failed? && !completed? && !killed?
end

#pausable?Boolean

Can the job be paused? failed, completed, paused, and killed jobs can’t be paused, for obvious reasons

Returns:

  • (Boolean)


307
308
309
# File 'lib/resque/plugins/state/hash.rb', line 307

def pausable?
  !failed? && !completed? && !killed? && !paused?
end

#pct_completeObject

calculate the % completion of the job based on status, num and total



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/resque/plugins/state/hash.rb', line 271

def pct_complete
  if completed?
    100
  elsif queued?
    0
  elsif failed?
    0
  else
    if total.nil?
      t = 1
    else t = total
    end
    (((num || 0).to_f / t.to_f) * 100).to_i
  end
end

#timeObject

Return the time of the status initialization. If set returns a Time object, otherwise returns nil



289
290
291
# File 'lib/resque/plugins/state/hash.rb', line 289

def time
  time? ? Time.at(self['time']) : nil
end

#to_json(*_args) ⇒ Object



312
313
314
# File 'lib/resque/plugins/state/hash.rb', line 312

def to_json(*_args)
  json
end