Class: Resque::Plugins::State::Hash
- Inherits:
-
Hash
- Object
- Hash
- Resque::Plugins::State::Hash
- Defined in:
- lib/resque/plugins/state/hash.rb
Overview
Resque::Plugins::State::Hash is a Hash object that has helper methods for dealing with the common status attributes. It also has a number of class methods for creating/updating/retrieving status objects from Redis
Class Attribute Summary collapse
-
.expire_in ⇒ Object
Returns the value of attribute expire_in.
Class Method Summary collapse
-
.clear(range_start = nil, range_end = nil) ⇒ Object
clear statuses from redis passing an optional range.
- .clear_completed(range_start = nil, range_end = nil) ⇒ Object
- .clear_failed(range_start = nil, range_end = nil) ⇒ Object
-
.create(uuid, *messages) ⇒ Object
Create a status, generating a new UUID, passing the message to the status Returns the UUID of the new status.
- .generate_uuid ⇒ Object
-
.get(uuid) ⇒ Object
Get a status by UUID.
- .hash_accessor(name, options = {}) ⇒ Object
-
.kill(uuid) ⇒ Object
Kill the job at UUID on its next iteration this works by adding the UUID to a kill list (a.k.a. a list of jobs to be killed. Each iteration the job checks if it should be killed by calling
tickorat. If so, it raises aResque::Plugins::State::Killederror and sets the status to ‘killed’.. -
.kill_ids ⇒ Object
Return the UUIDs of the jobs on the kill list.
- .kill_key ⇒ Object
-
.killall(range_start = nil, range_end = nil) ⇒ Object
Kills
numjobs within range starting with the most recent first. -
.killed(uuid) ⇒ Object
Remove the job at UUID from the kill list.
-
.mget(uuids) ⇒ Object
Get multiple statuses by UUID.
-
.pause(uuid) ⇒ Object
pause the job at UUID on its next iteration this works by adding the UUID to a pause list (a.k.a. a list of jobs to be pauseed. Each iteration the job checks if it should be pauseed by calling
tickorat. If so, it sleeps for 10 seconds before checking again if it should continue sleeping. -
.pause_ids ⇒ Object
Return the UUIDs of the jobs on the pause list.
- .pause_key ⇒ Object
-
.pauseall(range_start = nil, range_end = nil) ⇒ Object
pauses
numjobs within range starting with the most recent first. - .remove(uuid) ⇒ Object
-
.set(uuid, *messages) ⇒ Object
set a status by UUID.
- .set_key ⇒ Object
-
.should_kill?(uuid) ⇒ Boolean
Check whether a job with UUID is on the kill list.
-
.should_pause?(uuid) ⇒ Boolean
Check whether a job with UUID is on the pause list.
-
.status_ids(range_start = nil, range_end = nil) ⇒ Object
Return the
nummost recent status/job UUIDs in reverse chronological order. - .status_key(uuid) ⇒ Object
-
.statuses(range_start = nil, range_end = nil) ⇒ Object
Return
numResque::Plugins::State::Hash objects in reverse chronological order. -
.unpause(uuid) ⇒ Object
Remove the job at UUID from the pause list.
Instance Method Summary collapse
-
#initialize(*args) ⇒ Hash
constructor
Create a new Resque::Plugins::State::Hash object.
- #inspect ⇒ Object
-
#json ⇒ Object
Return a JSON representation of the current object.
-
#killable? ⇒ Boolean
Can the job be killed? failed, completed, and killed jobs can’t be killed, for obvious reasons.
-
#pausable? ⇒ Boolean
Can the job be paused? failed, completed, paused, and killed jobs can’t be paused, for obvious reasons.
-
#pct_complete ⇒ Object
calculate the % completion of the job based on
status,numandtotal. -
#time ⇒ Object
Return the time of the status initialization.
- #to_json(*_args) ⇒ Object
Constructor Details
#initialize(*args) ⇒ Hash
Create a new Resque::Plugins::State::Hash object. If multiple arguments are passed it is assumed the first argument is the UUID and the rest are status objects. All arguments are subsequentily merged in order. Strings are assumed to be messages.
251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/resque/plugins/state/hash.rb', line 251 def initialize(*args) super nil base_status = { 'time' => Time.now.to_i, 'status' => Resque::Plugins::State::STATUS_QUEUED } base_status['uuid'] = args.shift if args.length > 1 status_hash = args.inject(base_status) do |final, m| m = { 'message' => m } if m.is_a?(String) final.merge(m || {}) end replace(status_hash) end |
Class Attribute Details
.expire_in ⇒ Object
Returns the value of attribute expire_in.
182 183 184 |
# File 'lib/resque/plugins/state/hash.rb', line 182 def expire_in @expire_in end |
Class Method Details
.clear(range_start = nil, range_end = nil) ⇒ Object
clear statuses from redis passing an optional range. See statuses for info about ranges
47 48 49 50 51 |
# File 'lib/resque/plugins/state/hash.rb', line 47 def self.clear(range_start = nil, range_end = nil) status_ids(range_start, range_end).each do |id| remove(id) end end |
.clear_completed(range_start = nil, range_end = nil) ⇒ Object
53 54 55 56 57 58 59 60 61 62 |
# File 'lib/resque/plugins/state/hash.rb', line 53 def self.clear_completed(range_start = nil, range_end = nil) status_ids(range_start, range_end).select do |id| if get(id).completed? remove(id) true else false end end end |
.clear_failed(range_start = nil, range_end = nil) ⇒ Object
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/resque/plugins/state/hash.rb', line 64 def self.clear_failed(range_start = nil, range_end = nil) status_ids(range_start, range_end).select do |id| if get(id).failed? remove(id) true else false end end end |
.create(uuid, *messages) ⇒ Object
Create a status, generating a new UUID, passing the message to the status Returns the UUID of the new status.
12 13 14 15 16 17 |
# File 'lib/resque/plugins/state/hash.rb', line 12 def self.create(uuid, *) set(uuid, *) redis.zadd(set_key, Time.now.to_i, uuid) redis.zremrangebyscore(set_key, 0, Time.now.to_i - @expire_in) if @expire_in uuid end |
.generate_uuid ⇒ Object
206 207 208 |
# File 'lib/resque/plugins/state/hash.rb', line 206 def self.generate_uuid SecureRandom.hex.to_s end |
.get(uuid) ⇒ Object
Get a status by UUID. Returns a Resque::Plugins::State::Hash
20 21 22 23 |
# File 'lib/resque/plugins/state/hash.rb', line 20 def self.get(uuid) val = redis.get(status_key(uuid)) val ? Resque::Plugins::State::Hash.new(uuid, decode(val)) : nil end |
.hash_accessor(name, options = {}) ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/resque/plugins/state/hash.rb', line 210 def self.hash_accessor(name, = {}) [:default] ||= nil coerce = [:coerce] ? ".#{options[:coerce]}" : '' module_eval " def \#{name}\n value = (self['\#{name}'] ? self['\#{name}']\#{coerce} : \#{options[:default].inspect})\n yield value if block_given?\n value\n end\n\n def \#{name}=(value)\n self['\#{name}'] = value\n end\n\n def \#{name}?\n !!self['\#{name}']\n end\n EOT\nend\n" |
.kill(uuid) ⇒ Object
Kill the job at UUID on its next iteration this works by adding the UUID to a kill list (a.k.a. a list of jobs to be killed. Each iteration the job checks if it should be killed by calling tick or at. If so, it raises a Resque::Plugins::State::Killed error and sets the status to ‘killed’.
109 110 111 |
# File 'lib/resque/plugins/state/hash.rb', line 109 def self.kill(uuid) redis.sadd(kill_key, uuid) end |
.kill_ids ⇒ Object
Return the UUIDs of the jobs on the kill list
119 120 121 |
# File 'lib/resque/plugins/state/hash.rb', line 119 def self.kill_ids redis.smembers(kill_key) end |
.kill_key ⇒ Object
198 199 200 |
# File 'lib/resque/plugins/state/hash.rb', line 198 def self.kill_key '_kill' end |
.killall(range_start = nil, range_end = nil) ⇒ Object
Kills num jobs within range starting with the most recent first. By default kills all jobs. Note that the same conditions apply as kill, i.e. only jobs that check on each iteration by calling tick or at are eligible to killed.
131 132 133 134 135 |
# File 'lib/resque/plugins/state/hash.rb', line 131 def self.killall(range_start = nil, range_end = nil) status_ids(range_start, range_end).collect do |id| kill(id) end end |
.killed(uuid) ⇒ Object
Remove the job at UUID from the kill list
114 115 116 |
# File 'lib/resque/plugins/state/hash.rb', line 114 def self.killed(uuid) redis.srem(kill_key, uuid) end |
.mget(uuids) ⇒ Object
Get multiple statuses by UUID. Returns array of Resque::Plugins::State::Hash
26 27 28 29 30 31 32 33 34 |
# File 'lib/resque/plugins/state/hash.rb', line 26 def self.mget(uuids) return [] if uuids.empty? status_keys = uuids.map { |u| status_key(u) } vals = redis.mget(*status_keys) uuids.zip(vals).map do |uuid, val| val ? Resque::Plugins::State::Hash.new(uuid, decode(val)) : nil end end |
.pause(uuid) ⇒ Object
pause the job at UUID on its next iteration this works by adding the UUID to a pause list (a.k.a. a list of jobs to be pauseed. Each iteration the job checks if it should be pauseed by calling tick or at. If so, it sleeps for 10 seconds before checking again if it should continue sleeping
146 147 148 |
# File 'lib/resque/plugins/state/hash.rb', line 146 def self.pause(uuid) redis.sadd(pause_key, uuid) end |
.pause_ids ⇒ Object
Return the UUIDs of the jobs on the pause list
156 157 158 |
# File 'lib/resque/plugins/state/hash.rb', line 156 def self.pause_ids redis.smembers(pause_key) end |
.pause_key ⇒ Object
202 203 204 |
# File 'lib/resque/plugins/state/hash.rb', line 202 def self.pause_key '_pause' end |
.pauseall(range_start = nil, range_end = nil) ⇒ Object
pauses num jobs within range starting with the most recent first. By default pauses all jobs. Note that the same conditions apply as pause, i.e. only jobs that check on each iteration by calling tick or at are eligible to pauseed.
168 169 170 171 172 |
# File 'lib/resque/plugins/state/hash.rb', line 168 def self.pauseall(range_start = nil, range_end = nil) status_ids(range_start, range_end).collect do |id| pause(id) end end |
.remove(uuid) ⇒ Object
75 76 77 78 |
# File 'lib/resque/plugins/state/hash.rb', line 75 def self.remove(uuid) redis.del(status_key(uuid)) redis.zrem(set_key, uuid) end |
.set(uuid, *messages) ⇒ Object
set a status by UUID. messages can be any number of strings or hashes that are merged in order to create a single status.
38 39 40 41 42 43 |
# File 'lib/resque/plugins/state/hash.rb', line 38 def self.set(uuid, *) val = Resque::Plugins::State::Hash.new(uuid, *) redis.set(status_key(uuid), encode(val)) redis.expire(status_key(uuid), expire_in) if expire_in val end |
.set_key ⇒ Object
194 195 196 |
# File 'lib/resque/plugins/state/hash.rb', line 194 def self.set_key '_statuses' end |
.should_kill?(uuid) ⇒ Boolean
Check whether a job with UUID is on the kill list
138 139 140 |
# File 'lib/resque/plugins/state/hash.rb', line 138 def self.should_kill?(uuid) redis.sismember(kill_key, uuid) end |
.should_pause?(uuid) ⇒ Boolean
Check whether a job with UUID is on the pause list
175 176 177 |
# File 'lib/resque/plugins/state/hash.rb', line 175 def self.should_pause?(uuid) redis.sismember(pause_key, uuid) end |
.status_ids(range_start = nil, range_end = nil) ⇒ Object
Return the num most recent status/job UUIDs in reverse chronological order.
92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/resque/plugins/state/hash.rb', line 92 def self.status_ids(range_start = nil, range_end = nil) if range_end && range_start # Because we want a reverse chronological order, we need to get a range starting # by the higest negative number. The ordering is transparent from the API user's # perspective so we need to convert the passed params (redis.zrevrange(set_key, range_start.abs, (range_end || 1).abs) || []) else # Because we want a reverse chronological order, we need to get a range starting # by the higest negative number. redis.zrevrange(set_key, 0, -1) || [] end end |
.status_key(uuid) ⇒ Object
190 191 192 |
# File 'lib/resque/plugins/state/hash.rb', line 190 def self.status_key(uuid) "status:#{uuid}" end |
.statuses(range_start = nil, range_end = nil) ⇒ Object
Return num Resque::Plugins::State::Hash objects in reverse chronological order. By default returns the entire set.
86 87 88 89 |
# File 'lib/resque/plugins/state/hash.rb', line 86 def self.statuses(range_start = nil, range_end = nil) ids = status_ids(range_start, range_end) mget(ids).compact || [] end |
.unpause(uuid) ⇒ Object
Remove the job at UUID from the pause list
151 152 153 |
# File 'lib/resque/plugins/state/hash.rb', line 151 def self.unpause(uuid) redis.srem(pause_key, uuid) end |
Instance Method Details
#inspect ⇒ Object
320 321 322 |
# File 'lib/resque/plugins/state/hash.rb', line 320 def inspect "#<Resque::Plugins::State::Hash #{super}>" end |
#json ⇒ Object
Return a JSON representation of the current object.
314 315 316 317 318 |
# File 'lib/resque/plugins/state/hash.rb', line 314 def json h = dup h['pct_complete'] = pct_complete self.class.encode(h) end |
#killable? ⇒ Boolean
Can the job be killed? failed, completed, and killed jobs can’t be killed, for obvious reasons
297 298 299 |
# File 'lib/resque/plugins/state/hash.rb', line 297 def killable? !failed? && !completed? && !killed? end |
#pausable? ⇒ Boolean
Can the job be paused? failed, completed, paused, and killed jobs can’t be paused, for obvious reasons
303 304 305 |
# File 'lib/resque/plugins/state/hash.rb', line 303 def pausable? !failed? && !completed? && !killed? && !paused? end |
#pct_complete ⇒ Object
calculate the % completion of the job based on status, num and total
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/resque/plugins/state/hash.rb', line 267 def pct_complete if completed? 100 elsif queued? 0 elsif failed? 0 else if total.nil? t = 1 else t = total end (((num || 0).to_f / t.to_f) * 100).to_i end end |
#time ⇒ Object
Return the time of the status initialization. If set returns a Time object, otherwise returns nil
285 286 287 |
# File 'lib/resque/plugins/state/hash.rb', line 285 def time time? ? Time.at(self['time']) : nil end |
#to_json(*_args) ⇒ Object
308 309 310 |
# File 'lib/resque/plugins/state/hash.rb', line 308 def to_json(*_args) json end |