Module: Resque::Plugins::State
- Included in:
- JobWithState
- Defined in:
- lib/resque/plugins/state.rb,
lib/resque/plugins/state/hash.rb
Overview
Resque::Plugins::State is a module your jobs will include. It provides helper methods for updating the status/etc from within an instance as well as class methods for creating and queuing the jobs.
All you have to do to get this functionality is include Resque::Plugins::State and then implement a <tt>perform<tt> method.
For example
class ExampleJob
include Resque::Plugins::State
def perform
num = ['num']
i = 0
while i < num
i += 1
at(i, num)
end
completed("Finished!")
end
end
This job would iterate num times updating the status as it goes. At the end we update the status telling anyone listening to this job that its complete.
Defined Under Namespace
Modules: ClassMethods Classes: Hash, Killed, NotANumber, Revert
Constant Summary collapse
- VERSION =
'1.1.1'.freeze
- STATUS_QUEUED =
'queued'.freeze
- STATUS_WORKING =
'working'.freeze
- STATUS_COMPLETED =
'completed'.freeze
- STATUS_FAILED =
'failed'.freeze
- STATUS_KILLED =
'killed'.freeze
- STATUS_PAUSED =
'paused'.freeze
- STATUS_WAITING =
'waiting'.freeze
- STATUS_REVERTING =
'reverting'.freeze
- STATUS_REVERTED =
'reverted'.freeze
- STATUSES =
[ STATUS_QUEUED, STATUS_WORKING, STATUS_COMPLETED, STATUS_FAILED, STATUS_KILLED, STATUS_PAUSED, STATUS_WAITING, STATUS_REVERTING, STATUS_REVERTED ].freeze
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#uuid ⇒ Object
readonly
Returns the value of attribute uuid.
Class Method Summary collapse
Instance Method Summary collapse
-
#at(num, total, *messages) ⇒ Object
set the status of the job for the current itteration.
-
#completed(*messages) ⇒ Object
set the status to ‘completed’ passing along any addional messages.
-
#failed(*messages) ⇒ Object
set the status to ‘failed’ passing along any additional messages.
-
#initialize(uuid, options = {}) ⇒ Object
Create a new instance with
uuid
andoptions
. -
#kill! ⇒ Object
kill the current job, setting the status to ‘killed’ and raising
Killed
. -
#lock!(key = nil) ⇒ Object
lock against a provided or automatic key to prevent duplicate jobs.
-
#locked?(key) ⇒ Boolean
Checks against the lock list if this specific job instance should wait before starting.
- #name ⇒ Object
-
#pause!(pause_text = nil) ⇒ Object
pause the current job, setting the status to ‘paused’ and sleeping 10 seconds.
-
#revert! ⇒ Object
revert the current job, setting the status to ‘reverting’ and raising
Revert
. -
#reverted(*messages) ⇒ Object
set the status to ‘reverted’ passing along any additional messages.
-
#safe_perform! ⇒ Object
Run by the Resque::Worker when processing this job.
-
#should_kill? ⇒ Boolean
Checks against the kill list if this specific job instance should be killed on the next iteration.
-
#should_pause? ⇒ Boolean
Checks against the pause list if this specific job instance should be paused on the next iteration.
-
#should_revert? ⇒ Boolean
Checks against the revert list if this specific job instance should be paused on the next iteration.
-
#status ⇒ Object
get the Resque::Plugins::State::Hash object for the current uuid.
-
#status=(new_status) ⇒ Object
Set the jobs status.
-
#tick(*messages) ⇒ Object
sets the status of the job for the current itteration.
-
#unlock!(key = nil) ⇒ Object
unlock the provided or automatic key at the end of a job.
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
63 64 65 |
# File 'lib/resque/plugins/state.rb', line 63 def @options end |
#uuid ⇒ Object (readonly)
Returns the value of attribute uuid.
63 64 65 |
# File 'lib/resque/plugins/state.rb', line 63 def uuid @uuid end |
Class Method Details
.included(base) ⇒ Object
65 66 67 |
# File 'lib/resque/plugins/state.rb', line 65 def self.included(base) base.extend(ClassMethods) end |
Instance Method Details
#at(num, total, *messages) ⇒ Object
set the status of the job for the current itteration. num
and total
are passed to the status as well as any messages. This will kill the job if it has been added to the kill list with Resque::Plugins::State::Hash.kill()
247 248 249 250 251 252 253 254 255 256 |
# File 'lib/resque/plugins/state.rb', line 247 def at(num, total, *) if total.to_f <= 0.0 raise(NotANumber, "Called at() with total=#{total} which is not a number") end tick({ 'num' => num, 'total' => total }, *) end |
#completed(*messages) ⇒ Object
set the status to ‘completed’ passing along any addional messages
290 291 292 293 294 295 296 |
# File 'lib/resque/plugins/state.rb', line 290 def completed(*) job_status({ 'status' => STATUS_COMPLETED, 'message' => "Completed at #{Time.now}" }, *) @logger.info("Job #{@uuid}: #{.join(' ')}") end |
#failed(*messages) ⇒ Object
set the status to ‘failed’ passing along any additional messages
278 279 280 281 |
# File 'lib/resque/plugins/state.rb', line 278 def failed(*) job_status({ 'status' => STATUS_FAILED }, *) @logger.error("Job #{@uuid}: #{.join(' ')}") end |
#initialize(uuid, options = {}) ⇒ Object
Create a new instance with uuid
and options
164 165 166 167 168 169 |
# File 'lib/resque/plugins/state.rb', line 164 def initialize(uuid, = {}) @reverting = false @uuid = uuid @options = @logger = Resque.logger end |
#kill! ⇒ Object
kill the current job, setting the status to ‘killed’ and raising Killed
300 301 302 303 304 305 306 |
# File 'lib/resque/plugins/state.rb', line 300 def kill! = ["Killed at #{Time.now}"] job_status('status' => STATUS_KILLED, 'message' => [0]) @logger.error("Job #{@uuid}: #{.join(' ')}") raise Killed end |
#lock!(key = nil) ⇒ Object
lock against a provided or automatic key to prevent duplicate jobs
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 |
# File 'lib/resque/plugins/state.rb', line 344 def lock!(key = nil) lock = Digest::SHA1.hexdigest @options.to_json lock = key if key if locked?(lock) = ["Waiting at #{Time.now} due to existing job"] job_status('status' => STATUS_WAITING, 'message' => [0]) while locked?(lock) kill! if should_kill? pause! if should_pause? sleep 10 end else Resque::Plugins::State::Hash.lock(lock) end end |
#locked?(key) ⇒ Boolean
Checks against the lock list if this specific job instance should wait before starting
239 240 241 |
# File 'lib/resque/plugins/state.rb', line 239 def locked?(key) Resque::Plugins::State::Hash.locked?(key) end |
#name ⇒ Object
215 216 217 |
# File 'lib/resque/plugins/state.rb', line 215 def name "#{self.class.name}(#{.inspect unless .empty?})" end |
#pause!(pause_text = nil) ⇒ Object
pause the current job, setting the status to ‘paused’ and sleeping 10 seconds
329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/resque/plugins/state.rb', line 329 def pause!(pause_text = nil) Resque::Plugins::State::Hash.pause(uuid) = ["Paused at #{Time.now} #{pause_text}"] job_status('status' => STATUS_PAUSED, 'message' => [0]) raise Killed if @testing # Don't loop or complete during testing @logger.info("Job #{@uuid}: #{.join(' ')}") while should_pause? kill! if should_kill? revert! if should_revert? sleep 10 end end |
#revert! ⇒ Object
revert the current job, setting the status to ‘reverting’ and raising Revert
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/resque/plugins/state.rb', line 310 def revert! if respond_to?(:on_revert) = ["Reverting at #{Time.now}"] Resque::Plugins::State::Hash.unpause(uuid) if should_pause? @reverting = true job_status('status' => STATUS_REVERTING, 'message' => [0]) @logger.info("Job #{@uuid}: #{.join(' ')}") raise Revert else @logger.error("Job #{@uuid}: Attempted revert on job with no revert"\ " support") Resque::Plugins::State::Hash.no_revert(@uuid) pause!('This job does not support revert functionality') end end |
#reverted(*messages) ⇒ Object
set the status to ‘reverted’ passing along any additional messages
284 285 286 287 |
# File 'lib/resque/plugins/state.rb', line 284 def reverted(*) job_status({ 'status' => STATUS_REVERTED }, *) @logger.error("Job #{@uuid}: #{.join(' ')}") end |
#safe_perform! ⇒ Object
Run by the Resque::Worker when processing this job. It wraps the perform
method ensuring that the final status of the job is set regardless of error. If an error occurs within the job’s work, it will set the status as failed and re-raise the error.
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/resque/plugins/state.rb', line 175 def safe_perform! job_status('status' => STATUS_WORKING) = ['Job starting'] @logger.info("#{@uuid}: #{.join(' ')}") perform if status && status.failed? on_failure(status.) if respond_to?(:on_failure) return elsif status && !status.completed? completed end on_success if respond_to?(:on_success) rescue Killed Resque::Plugins::State::Hash.killed(uuid) on_killed if respond_to?(:on_killed) rescue Revert Resque::Plugins::State::Hash.revert(uuid) on_revert = ["Reverted at #{Time.now}"] job_status('status' => STATUS_REVERTED, 'message' => [0]) rescue => e = ["Failed at #{Time.now}: #{e.}"] @logger.error("Job #{@uuid}: #{.join(' ')}") failed("The task failed because of an error: #{e}") raise e unless respond_to?(:on_failure) on_failure(e) end |
#should_kill? ⇒ Boolean
Checks against the kill list if this specific job instance should be killed on the next iteration
221 222 223 |
# File 'lib/resque/plugins/state.rb', line 221 def should_kill? Resque::Plugins::State::Hash.should_kill?(uuid) end |
#should_pause? ⇒ Boolean
Checks against the pause list if this specific job instance should be paused on the next iteration
227 228 229 |
# File 'lib/resque/plugins/state.rb', line 227 def should_pause? Resque::Plugins::State::Hash.should_pause?(uuid) end |
#should_revert? ⇒ Boolean
Checks against the revert list if this specific job instance should be paused on the next iteration
233 234 235 |
# File 'lib/resque/plugins/state.rb', line 233 def should_revert? Resque::Plugins::State::Hash.should_revert?(uuid) end |
#status ⇒ Object
get the Resque::Plugins::State::Hash object for the current uuid
211 212 213 |
# File 'lib/resque/plugins/state.rb', line 211 def status Resque::Plugins::State::Hash.get(uuid) end |
#status=(new_status) ⇒ Object
Set the jobs status. Can take an array of strings or hashes that are merged (in order) into a final status hash.
206 207 208 |
# File 'lib/resque/plugins/state.rb', line 206 def status=(new_status) Resque::Plugins::State::Hash.set(uuid, *new_status) end |
#tick(*messages) ⇒ Object
sets the status of the job for the current itteration. You should use the at
method if you have actual numbers to track the iteration count. This will kill or pause the job if it has been added to either list with Resque::Plugins::State::Hash.pause()
or Resque::Plugins::State::Hash.kill()
respectively
263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/resque/plugins/state.rb', line 263 def tick(*) kill! if should_kill? if should_pause? pause! elsif should_revert? return revert! unless @reverting job_status({ 'status' => STATUS_REVERTING }, *) @logger.info("Job #{@uuid}: #{.join(' ')}") else job_status({ 'status' => STATUS_WORKING }, *) @logger.info("Job #{@uuid}: #{.join(' ')}") end end |