Module: Resque::Plugins::State

Included in:
JobWithState
Defined in:
lib/resque/plugins/state.rb,
lib/resque/plugins/state/hash.rb

Overview

Resque::Plugins::State is a module your jobs will include. It provides helper methods for updating the status/etc from within an instance as well as class methods for creating and queuing the jobs.

All you have to do to get this functionality is include Resque::Plugins::State and then implement a <tt>perform<tt> method.

For example

class ExampleJob
  include Resque::Plugins::State

  def perform
    num = options['num']
    i = 0
    while i < num
      i += 1
      at(i, num)
    end
    completed("Finished!")
  end

end

This job would iterate num times updating the status as it goes. At the end we update the status telling anyone listening to this job that its complete.

Defined Under Namespace

Modules: ClassMethods Classes: Hash, Killed, NotANumber

Constant Summary collapse

VERSION =
'1.0.1'.freeze
STATUS_QUEUED =
'queued'.freeze
STATUS_WORKING =
'working'.freeze
STATUS_COMPLETED =
'completed'.freeze
STATUS_FAILED =
'failed'.freeze
STATUS_KILLED =
'killed'.freeze
STATUS_PAUSED =
'paused'.freeze
STATUSES =
[
  STATUS_QUEUED,
  STATUS_WORKING,
  STATUS_COMPLETED,
  STATUS_FAILED,
  STATUS_KILLED,
  STATUS_PAUSED
].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



54
55
56
# File 'lib/resque/plugins/state.rb', line 54

def options
  @options
end

#uuidObject (readonly)

Returns the value of attribute uuid.



54
55
56
# File 'lib/resque/plugins/state.rb', line 54

def uuid
  @uuid
end

Class Method Details

.included(base) ⇒ Object



56
57
58
# File 'lib/resque/plugins/state.rb', line 56

def self.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#at(num, total, *messages) ⇒ Object

set the status of the job for the current itteration. num and total are passed to the status as well as any messages. This will kill the job if it has been added to the kill list with Resque::Plugins::State::Hash.kill()



216
217
218
219
220
221
222
223
224
225
# File 'lib/resque/plugins/state.rb', line 216

def at(num, total, *messages)
  if total.to_f <= 0.0
    raise(NotANumber,
          "Called at() with total=#{total} which is not a number")
  end
  tick({
         'num' => num,
         'total' => total
       }, *messages)
end

#completed(*messages) ⇒ Object

set the status to ‘completed’ passing along any addional messages



249
250
251
252
253
254
255
# File 'lib/resque/plugins/state.rb', line 249

def completed(*messages)
  job_status({
               'status' => STATUS_COMPLETED,
               'message' => "Completed at #{Time.now}"
             }, *messages)
  @logger.info("Job #{@uuid}: #{messages.join(' ')}")
end

#failed(*messages) ⇒ Object

set the status to ‘failed’ passing along any additional messages



243
244
245
246
# File 'lib/resque/plugins/state.rb', line 243

def failed(*messages)
  job_status({ 'status' => STATUS_FAILED }, *messages)
  @logger.error("Job #{@uuid}: #{messages.join(' ')}")
end

#initialize(uuid, options = {}) ⇒ Object

Create a new instance with uuid and options



154
155
156
157
158
# File 'lib/resque/plugins/state.rb', line 154

def initialize(uuid, options = {})
  @uuid    = uuid
  @options = options
  @logger = Resque.logger
end

#kill!Object

kill the current job, setting the status to ‘killed’ and raising Killed

Raises:



259
260
261
262
263
264
265
# File 'lib/resque/plugins/state.rb', line 259

def kill!
  messages = ["Killed at #{Time.now}"]
  job_status('status' => STATUS_KILLED,
             'message' => messages[0])
  @logger.error("Job #{@uuid}: #{messages.join(' ')}")
  raise Killed
end

#nameObject



196
197
198
# File 'lib/resque/plugins/state.rb', line 196

def name
  "#{self.class.name}(#{options.inspect unless options.empty?})"
end

#pause!Object

pause the current job, setting the status to ‘paused’ and sleeping 10 seconds

Raises:



269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/resque/plugins/state.rb', line 269

def pause!
  Resque::Plugins::State::Hash.pause(uuid)
  messages = ["Paused at #{Time.now}"]
  job_status('status' => STATUS_PAUSED,
             'message' => messages[0])
  raise Killed if @testing # Don't loop or complete during testing
  @logger.info("Job #{@uuid}: #{messages.join(' ')}")
  while should_pause?
    kill! if should_kill?
    sleep 10
  end
end

#safe_perform!Object

Run by the Resque::Worker when processing this job. It wraps the perform method ensuring that the final status of the job is set regardless of error. If an error occurs within the job’s work, it will set the status as failed and re-raise the error.



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/resque/plugins/state.rb', line 164

def safe_perform!
  job_status('status' => STATUS_WORKING)
  messages = ['Job starting']
  @logger.info("#{@uuid}: #{messages.join(' ')}")
  perform
  if status && status.failed?
    on_failure(status.message) if respond_to?(:on_failure)
    return
  elsif status && !status.completed?
    completed
  end
  on_success if respond_to?(:on_success)
rescue Killed
  Resque::Plugins::State::Hash.killed(uuid)
  on_killed if respond_to?(:on_killed)
rescue => e
  failed("The task failed because of an error: #{e}")
  raise e unless respond_to?(:on_failure)
  on_failure(e)
end

#should_kill?Boolean

Checks against the kill list if this specific job instance should be killed on the next iteration

Returns:

  • (Boolean)


202
203
204
# File 'lib/resque/plugins/state.rb', line 202

def should_kill?
  Resque::Plugins::State::Hash.should_kill?(uuid)
end

#should_pause?Boolean

Checks against the pause list if this specific job instance should be paused on the next iteration

Returns:

  • (Boolean)


208
209
210
# File 'lib/resque/plugins/state.rb', line 208

def should_pause?
  Resque::Plugins::State::Hash.should_pause?(uuid)
end

#statusObject

get the Resque::Plugins::State::Hash object for the current uuid



192
193
194
# File 'lib/resque/plugins/state.rb', line 192

def status
  Resque::Plugins::State::Hash.get(uuid)
end

#status=(new_status) ⇒ Object

Set the jobs status. Can take an array of strings or hashes that are merged (in order) into a final status hash.



187
188
189
# File 'lib/resque/plugins/state.rb', line 187

def status=(new_status)
  Resque::Plugins::State::Hash.set(uuid, *new_status)
end

#tick(*messages) ⇒ Object

sets the status of the job for the current itteration. You should use the at method if you have actual numbers to track the iteration count. This will kill or pause the job if it has been added to either list with Resque::Plugins::State::Hash.pause() or Resque::Plugins::State::Hash.kill() respectively



232
233
234
235
236
237
238
239
240
# File 'lib/resque/plugins/state.rb', line 232

def tick(*messages)
  kill! if should_kill?
  if should_pause?
    pause!
  else
    job_status({ 'status' => STATUS_WORKING }, *messages)
    @logger.info("Job #{@uuid}: #{messages.join(' ')}")
  end
end