Module: Resque::Plugins::State

Included in:
JobWithState
Defined in:
lib/resque/plugins/state.rb,
lib/resque/plugins/state/hash.rb

Overview

Resque::Plugins::State is a module your jobs will include. It provides helper methods for updating the status/etc from within an instance as well as class methods for creating and queuing the jobs.

All you have to do to get this functionality is include Resque::Plugins::State and then implement a <tt>perform<tt> method.

For example

class ExampleJob
  include Resque::Plugins::State

  def perform
    num = options['num']
    i = 0
    while i < num
      i += 1
      at(i, num)
    end
    completed("Finished!")
  end

end

This job would iterate num times updating the status as it goes. At the end we update the status telling anyone listening to this job that its complete.

Defined Under Namespace

Modules: ClassMethods Classes: Hash, Killed, NotANumber, Revert

Constant Summary collapse

VERSION =
'1.1.1'.freeze
STATUS_QUEUED =
'queued'.freeze
STATUS_WORKING =
'working'.freeze
STATUS_COMPLETED =
'completed'.freeze
STATUS_FAILED =
'failed'.freeze
STATUS_KILLED =
'killed'.freeze
STATUS_PAUSED =
'paused'.freeze
STATUS_WAITING =
'waiting'.freeze
STATUS_REVERTING =
'reverting'.freeze
STATUS_REVERTED =
'reverted'.freeze
STATUSES =
[
  STATUS_QUEUED,
  STATUS_WORKING,
  STATUS_COMPLETED,
  STATUS_FAILED,
  STATUS_KILLED,
  STATUS_PAUSED,
  STATUS_WAITING,
  STATUS_REVERTING,
  STATUS_REVERTED
].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



63
64
65
# File 'lib/resque/plugins/state.rb', line 63

def options
  @options
end

#uuidObject (readonly)

Returns the value of attribute uuid.



63
64
65
# File 'lib/resque/plugins/state.rb', line 63

def uuid
  @uuid
end

Class Method Details

.included(base) ⇒ Object



65
66
67
# File 'lib/resque/plugins/state.rb', line 65

def self.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#at(num, total, *messages) ⇒ Object

set the status of the job for the current itteration. num and total are passed to the status as well as any messages. This will kill the job if it has been added to the kill list with Resque::Plugins::State::Hash.kill()



247
248
249
250
251
252
253
254
255
256
# File 'lib/resque/plugins/state.rb', line 247

def at(num, total, *messages)
  if total.to_f <= 0.0
    raise(NotANumber,
          "Called at() with total=#{total} which is not a number")
  end
  tick({
         'num' => num,
         'total' => total
       }, *messages)
end

#completed(*messages) ⇒ Object

set the status to ‘completed’ passing along any addional messages



290
291
292
293
294
295
296
# File 'lib/resque/plugins/state.rb', line 290

def completed(*messages)
  job_status({
               'status' => STATUS_COMPLETED,
               'message' => "Completed at #{Time.now}"
             }, *messages)
  @logger.info("Job #{@uuid}: #{messages.join(' ')}")
end

#failed(*messages) ⇒ Object

set the status to ‘failed’ passing along any additional messages



278
279
280
281
# File 'lib/resque/plugins/state.rb', line 278

def failed(*messages)
  job_status({ 'status' => STATUS_FAILED }, *messages)
  @logger.error("Job #{@uuid}: #{messages.join(' ')}")
end

#initialize(uuid, options = {}) ⇒ Object

Create a new instance with uuid and options



164
165
166
167
168
169
# File 'lib/resque/plugins/state.rb', line 164

def initialize(uuid, options = {})
  @reverting = false
  @uuid      = uuid
  @options   = options
  @logger    = Resque.logger
end

#kill!Object

kill the current job, setting the status to ‘killed’ and raising Killed

Raises:



300
301
302
303
304
305
306
# File 'lib/resque/plugins/state.rb', line 300

def kill!
  messages = ["Killed at #{Time.now}"]
  job_status('status' => STATUS_KILLED,
             'message' => messages[0])
  @logger.error("Job #{@uuid}: #{messages.join(' ')}")
  raise Killed
end

#lock!(key = nil) ⇒ Object

lock against a provided or automatic key to prevent duplicate jobs



344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# File 'lib/resque/plugins/state.rb', line 344

def lock!(key = nil)
  lock = Digest::SHA1.hexdigest @options.to_json
  lock = key if key
  if locked?(lock)
    messages = ["Waiting at #{Time.now} due to existing job"]
    job_status('status' => STATUS_WAITING,
               'message' => messages[0])
    while locked?(lock)
      kill! if should_kill?
      pause! if should_pause?
      sleep 10
    end
  else
    Resque::Plugins::State::Hash.lock(lock)
  end
end

#locked?(key) ⇒ Boolean

Checks against the lock list if this specific job instance should wait before starting

Returns:

  • (Boolean)


239
240
241
# File 'lib/resque/plugins/state.rb', line 239

def locked?(key)
  Resque::Plugins::State::Hash.locked?(key)
end

#nameObject



215
216
217
# File 'lib/resque/plugins/state.rb', line 215

def name
  "#{self.class.name}(#{options.inspect unless options.empty?})"
end

#pause!(pause_text = nil) ⇒ Object

pause the current job, setting the status to ‘paused’ and sleeping 10 seconds

Raises:



329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/resque/plugins/state.rb', line 329

def pause!(pause_text = nil)
  Resque::Plugins::State::Hash.pause(uuid)
  messages = ["Paused at #{Time.now} #{pause_text}"]
  job_status('status' => STATUS_PAUSED,
             'message' => messages[0])
  raise Killed if @testing # Don't loop or complete during testing
  @logger.info("Job #{@uuid}: #{messages.join(' ')}")
  while should_pause?
    kill! if should_kill?
    revert! if should_revert?
    sleep 10
  end
end

#revert!Object

revert the current job, setting the status to ‘reverting’ and raising Revert



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/resque/plugins/state.rb', line 310

def revert!
  if respond_to?(:on_revert)
    messages = ["Reverting at #{Time.now}"]
    Resque::Plugins::State::Hash.unpause(uuid) if should_pause?
    @reverting = true
    job_status('status' => STATUS_REVERTING,
               'message' => messages[0])
    @logger.info("Job #{@uuid}: #{messages.join(' ')}")
    raise Revert
  else
    @logger.error("Job #{@uuid}: Attempted revert on job with no revert"\
                  " support")
    Resque::Plugins::State::Hash.no_revert(@uuid)
    pause!('This job does not support revert functionality')
  end
end

#reverted(*messages) ⇒ Object

set the status to ‘reverted’ passing along any additional messages



284
285
286
287
# File 'lib/resque/plugins/state.rb', line 284

def reverted(*messages)
  job_status({ 'status' => STATUS_REVERTED }, *messages)
  @logger.error("Job #{@uuid}: #{messages.join(' ')}")
end

#safe_perform!Object

Run by the Resque::Worker when processing this job. It wraps the perform method ensuring that the final status of the job is set regardless of error. If an error occurs within the job’s work, it will set the status as failed and re-raise the error.



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/resque/plugins/state.rb', line 175

def safe_perform!
  job_status('status' => STATUS_WORKING)
  messages = ['Job starting']
  @logger.info("#{@uuid}: #{messages.join(' ')}")
  perform
  if status && status.failed?
    on_failure(status.message) if respond_to?(:on_failure)
    return
  elsif status && !status.completed?
    completed
  end
  on_success if respond_to?(:on_success)
rescue Killed
  Resque::Plugins::State::Hash.killed(uuid)
  on_killed if respond_to?(:on_killed)
rescue Revert
  Resque::Plugins::State::Hash.revert(uuid)
  on_revert
  messages = ["Reverted at #{Time.now}"]
  job_status('status' => STATUS_REVERTED,
             'message' => messages[0])
rescue => e
  messages = ["Failed at #{Time.now}: #{e.message}"]
  @logger.error("Job #{@uuid}: #{messages.join(' ')}")
  failed("The task failed because of an error: #{e}")
  raise e unless respond_to?(:on_failure)
  on_failure(e)
end

#should_kill?Boolean

Checks against the kill list if this specific job instance should be killed on the next iteration

Returns:

  • (Boolean)


221
222
223
# File 'lib/resque/plugins/state.rb', line 221

def should_kill?
  Resque::Plugins::State::Hash.should_kill?(uuid)
end

#should_pause?Boolean

Checks against the pause list if this specific job instance should be paused on the next iteration

Returns:

  • (Boolean)


227
228
229
# File 'lib/resque/plugins/state.rb', line 227

def should_pause?
  Resque::Plugins::State::Hash.should_pause?(uuid)
end

#should_revert?Boolean

Checks against the revert list if this specific job instance should be paused on the next iteration

Returns:

  • (Boolean)


233
234
235
# File 'lib/resque/plugins/state.rb', line 233

def should_revert?
  Resque::Plugins::State::Hash.should_revert?(uuid)
end

#statusObject

get the Resque::Plugins::State::Hash object for the current uuid



211
212
213
# File 'lib/resque/plugins/state.rb', line 211

def status
  Resque::Plugins::State::Hash.get(uuid)
end

#status=(new_status) ⇒ Object

Set the jobs status. Can take an array of strings or hashes that are merged (in order) into a final status hash.



206
207
208
# File 'lib/resque/plugins/state.rb', line 206

def status=(new_status)
  Resque::Plugins::State::Hash.set(uuid, *new_status)
end

#tick(*messages) ⇒ Object

sets the status of the job for the current itteration. You should use the at method if you have actual numbers to track the iteration count. This will kill or pause the job if it has been added to either list with Resque::Plugins::State::Hash.pause() or Resque::Plugins::State::Hash.kill() respectively



263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/resque/plugins/state.rb', line 263

def tick(*messages)
  kill! if should_kill?
  if should_pause?
    pause!
  elsif should_revert?
    return revert! unless @reverting
    job_status({ 'status' => STATUS_REVERTING }, *messages)
    @logger.info("Job #{@uuid}: #{messages.join(' ')}")
  else
    job_status({ 'status' => STATUS_WORKING }, *messages)
    @logger.info("Job #{@uuid}: #{messages.join(' ')}")
  end
end

#unlock!(key = nil) ⇒ Object

unlock the provided or automatic key at the end of a job



362
363
364
365
366
# File 'lib/resque/plugins/state.rb', line 362

def unlock!(key = nil)
  lock = Digest::SHA1.hexdigest @options.to_json
  lock = key if key
  Resque::Plugins::State::Hash.unlock(lock)
end