Class: Resque::Worker
Overview
A Resque Worker processes jobs. On platforms that support fork(2), the worker will fork off a child to process each job. This ensures a clean slate when beginning the next job and cuts down on gradual memory growth as well as low level failures.
It also ensures workers are always listening to signals from you, their master, and can react accordingly.
Instance Attribute Summary collapse
-
#cant_fork ⇒ Object
Boolean indicating whether this worker can or can not fork.
-
#run_at_exit_hooks ⇒ Object
When set to true, forked workers will exit with ‘exit`, calling any `at_exit` code handlers that have been registered in the application.
-
#term_child ⇒ Object
decide whether to use new_kill_child logic.
-
#term_timeout ⇒ Object
Returns the value of attribute term_timeout.
-
#to_s ⇒ Object
(also: #id)
The string representation is the same as the id for this worker instance.
Class Method Summary collapse
-
.all ⇒ Object
Returns an array of all worker objects.
-
.attach(worker_id) ⇒ Object
Alias of ‘find`.
-
.exists?(worker_id) ⇒ Boolean
Given a string worker id, return a boolean indicating whether the worker exists.
-
.find(worker_id) ⇒ Object
Returns a single worker object.
- .redis ⇒ Object
-
.working ⇒ Object
Returns an array of all worker objects currently processing jobs.
Instance Method Summary collapse
-
#==(other) ⇒ Object
Is this worker the same as another worker?.
-
#decode(object) ⇒ Object
Given a string, returns a Ruby object.
-
#done_working ⇒ Object
Called when we are done working - clears our ‘working_on` state and tells Redis we processed a job.
-
#enable_gc_optimizations ⇒ Object
Enables GC Optimizations if you’re running REE.
-
#encode(object) ⇒ Object
Given a Ruby object, returns a string suitable for storage in a queue.
-
#failed ⇒ Object
How many failed jobs has this worker seen? Returns an int.
-
#failed! ⇒ Object
Tells Redis we’ve failed a job.
-
#fork(job) ⇒ Object
Not every platform supports fork.
- #fork_per_job? ⇒ Boolean
- #glob_match(pattern) ⇒ Object
-
#hostname ⇒ Object
chomp’d hostname of this machine.
-
#idle? ⇒ Boolean
Boolean - true if idle, false if not.
-
#initialize(*queues) ⇒ Worker
constructor
Workers should be initialized with an array of string queue names.
- #inspect ⇒ Object
-
#job ⇒ Object
(also: #processing)
Returns a hash explaining the Job we’re currently processing, if any.
-
#kill_child ⇒ Object
Kills the forked child immediately, without remorse.
-
#linux_worker_pids ⇒ Object
Find Resque worker pids on Linux and OS X.
-
#log(message) ⇒ Object
Log a message to Resque.logger can’t use alias_method since info/debug are private methods.
- #log!(message) ⇒ Object
- #logger_severity_deprecation_warning ⇒ Object
-
#new_kill_child ⇒ Object
Kills the forked child immediately with minimal remorse.
-
#pause_processing ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one).
-
#paused? ⇒ Boolean
are we paused?.
-
#perform(job) ⇒ Object
Processes a given job in the child.
-
#pid ⇒ Object
Returns Integer PID of running worker.
-
#process(job = nil, &block) ⇒ Object
DEPRECATED.
-
#processed ⇒ Object
How many jobs has this worker processed? Returns an int.
-
#processed! ⇒ Object
Tell Redis we’ve processed a job.
-
#procline(string) ⇒ Object
Given a string, sets the procline ($0) and logs.
-
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
-
#queues ⇒ Object
Returns a list of queues to use when searching for a job.
-
#reconnect ⇒ Object
Reconnect to Redis to avoid sharing a connection with the parent, retry up to 3 times with increasing delay before giving up.
- #redis ⇒ Object
-
#register_signal_handlers ⇒ Object
Registers the various signal handlers a worker responds to.
-
#register_worker ⇒ Object
Registers ourself as a worker.
-
#report_failed_job(job, exception) ⇒ Object
Reports the exception and marks the job as failed.
-
#reserve ⇒ Object
Attempts to grab a job off one of the provided queues.
-
#run_hook(name, *args) ⇒ Object
Runs a named hook, passing along any arguments.
-
#shutdown ⇒ Object
Schedule this worker for shutdown.
-
#shutdown! ⇒ Object
Kill the child and shutdown immediately.
-
#shutdown? ⇒ Boolean
Should this worker shutdown as soon as current job is finished?.
-
#solaris_worker_pids ⇒ Object
Find Resque worker pids on Solaris.
-
#started ⇒ Object
What time did this worker start? Returns an instance of ‘Time`.
-
#started! ⇒ Object
Tell Redis we’ve started.
-
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle.
-
#state ⇒ Object
Returns a symbol representing the current worker state, which can be either :working or :idle.
-
#unpause_processing ⇒ Object
Start processing jobs again after a pause.
- #unregister_signal_handlers ⇒ Object
-
#unregister_worker(exception = nil) ⇒ Object
Unregisters ourself as a worker.
-
#validate_queues ⇒ Object
A worker must be given a queue, otherwise it won’t know what to do with itself.
-
#verbose ⇒ Object
Deprecated legacy methods for controlling the logging threshhold Use Resque.logger.level now, e.g.:.
- #verbose=(value) ⇒ Object
- #very_verbose ⇒ Object
- #very_verbose=(value) ⇒ Object
- #will_fork? ⇒ Boolean
-
#windows_worker_pids ⇒ Object
Returns an Array of string pids of all the other workers on this machine.
-
#work(interval = 5.0, &block) ⇒ Object
This is the main workhorse method.
-
#worker_pids ⇒ Object
Returns an Array of string pids of all the other workers on this machine.
-
#working? ⇒ Boolean
Boolean - true if working, false if not.
-
#working_on(job) ⇒ Object
Given a job, tells Redis we’re working on it.
Methods included from Logging
debug, error, fatal, info, log, warn
Constructor Details
#initialize(*queues) ⇒ Worker
Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.
If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.
128 129 130 131 132 133 |
# File 'lib/resque/worker.rb', line 128 def initialize(*queues) @queues = queues.map { |queue| queue.to_s.strip } @shutdown = nil @paused = nil validate_queues end |
Instance Attribute Details
#cant_fork ⇒ Object
Boolean indicating whether this worker can or can not fork. Automatically set if a fork(2) fails.
50 51 52 |
# File 'lib/resque/worker.rb', line 50 def cant_fork @cant_fork end |
#run_at_exit_hooks ⇒ Object
When set to true, forked workers will exit with ‘exit`, calling any `at_exit` code handlers that have been registered in the application. Otherwise, forked workers exit with `exit!`
59 60 61 |
# File 'lib/resque/worker.rb', line 59 def run_at_exit_hooks @run_at_exit_hooks end |
#term_child ⇒ Object
decide whether to use new_kill_child logic
55 56 57 |
# File 'lib/resque/worker.rb', line 55 def term_child @term_child end |
#term_timeout ⇒ Object
Returns the value of attribute term_timeout.
52 53 54 |
# File 'lib/resque/worker.rb', line 52 def term_timeout @term_timeout end |
#to_s ⇒ Object Also known as: id
The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.
656 657 658 |
# File 'lib/resque/worker.rb', line 656 def to_s @to_s ||= "#{hostname}:#{pid}:#{@queues.join(',')}" end |
Class Method Details
.all ⇒ Object
Returns an array of all worker objects.
64 65 66 |
# File 'lib/resque/worker.rb', line 64 def self.all Array(redis.smembers(:workers)).map { |id| find(id) }.compact end |
.attach(worker_id) ⇒ Object
Alias of ‘find`
107 108 109 |
# File 'lib/resque/worker.rb', line 107 def self.attach(worker_id) find(worker_id) end |
.exists?(worker_id) ⇒ Boolean
Given a string worker id, return a boolean indicating whether the worker exists
113 114 115 |
# File 'lib/resque/worker.rb', line 113 def self.exists?(worker_id) redis.sismember(:workers, worker_id) end |
.find(worker_id) ⇒ Object
Returns a single worker object. Accepts a string id.
95 96 97 98 99 100 101 102 103 104 |
# File 'lib/resque/worker.rb', line 95 def self.find(worker_id) if exists? worker_id queues = worker_id.split(':')[-1].split(',') worker = new(*queues) worker.to_s = worker_id worker else nil end end |
.working ⇒ Object
Returns an array of all worker objects currently processing jobs.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/resque/worker.rb', line 70 def self.working names = all return [] unless names.any? names.map! { |name| "worker:#{name}" } reportedly_working = {} begin reportedly_working = redis.mapped_mget(*names).reject do |key, value| value.nil? || value.empty? end rescue Redis::Distributed::CannotDistribute names.each do |name| value = redis.get name reportedly_working[name] = value unless value.nil? || value.empty? end end reportedly_working.keys.map do |key| find key.sub("worker:", '') end.compact end |
Instance Method Details
#==(other) ⇒ Object
Is this worker the same as another worker?
646 647 648 |
# File 'lib/resque/worker.rb', line 646 def ==(other) to_s == other.to_s end |
#decode(object) ⇒ Object
Given a string, returns a Ruby object.
34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/resque/worker.rb', line 34 def decode(object) return unless object begin if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.load object else MultiJson.decode object end rescue ::MultiJson::DecodeError => e raise DecodeException, e., e.backtrace end end |
#done_working ⇒ Object
Called when we are done working - clears our ‘working_on` state and tells Redis we processed a job.
576 577 578 579 580 581 |
# File 'lib/resque/worker.rb', line 576 def done_working redis.pipelined do processed! redis.del("worker:#{self}") end end |
#enable_gc_optimizations ⇒ Object
Enables GC Optimizations if you’re running REE. www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow
354 355 356 357 358 |
# File 'lib/resque/worker.rb', line 354 def enable_gc_optimizations if GC.respond_to?(:copy_on_write_friendly=) GC.copy_on_write_friendly = true end end |
#encode(object) ⇒ Object
Given a Ruby object, returns a string suitable for storage in a queue.
25 26 27 28 29 30 31 |
# File 'lib/resque/worker.rb', line 25 def encode(object) if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.dump object else MultiJson.encode object end end |
#failed ⇒ Object
How many failed jobs has this worker seen? Returns an int.
595 596 597 |
# File 'lib/resque/worker.rb', line 595 def failed Stat["failed:#{self}"] end |
#failed! ⇒ Object
Tells Redis we’ve failed a job.
600 601 602 603 |
# File 'lib/resque/worker.rb', line 600 def failed! Stat << "failed" Stat << "failed:#{self}" end |
#fork(job) ⇒ Object
Not every platform supports fork. Here we do our magic to determine if yours does.
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/resque/worker.rb', line 318 def fork(job) return if @cant_fork # Only run before_fork hooks if we're actually going to fork # (after checking @cant_fork) run_hook :before_fork, job begin # IronRuby doesn't support `Kernel.fork` yet if Kernel.respond_to?(:fork) Kernel.fork if will_fork? else raise NotImplementedError end rescue NotImplementedError @cant_fork = true nil end end |
#fork_per_job? ⇒ Boolean
635 636 637 |
# File 'lib/resque/worker.rb', line 635 def fork_per_job? ENV["FORK_PER_JOB"] != 'false' end |
#glob_match(pattern) ⇒ Object
310 311 312 313 314 |
# File 'lib/resque/worker.rb', line 310 def glob_match(pattern) Resque.queues.select do |queue| File.fnmatch?(pattern, queue) end.sort end |
#hostname ⇒ Object
chomp’d hostname of this machine
662 663 664 |
# File 'lib/resque/worker.rb', line 662 def hostname Socket.gethostname end |
#idle? ⇒ Boolean
Boolean - true if idle, false if not
627 628 629 |
# File 'lib/resque/worker.rb', line 627 def idle? state == :idle end |
#inspect ⇒ Object
650 651 652 |
# File 'lib/resque/worker.rb', line 650 def inspect "#<Worker #{to_s}>" end |
#job ⇒ Object Also known as: processing
Returns a hash explaining the Job we’re currently processing, if any.
616 617 618 |
# File 'lib/resque/worker.rb', line 616 def job decode(redis.get("worker:#{self}")) || {} end |
#kill_child ⇒ Object
Kills the forked child immediately, without remorse. The job it is processing will not be completed.
438 439 440 441 442 443 444 445 446 447 448 |
# File 'lib/resque/worker.rb', line 438 def kill_child if @child log! "Killing child at #{@child}" if `ps -o pid,state -p #{@child}` Process.kill("KILL", @child) rescue nil else log! "Child #{@child} not found, restarting." shutdown end end end |
#linux_worker_pids ⇒ Object
Find Resque worker pids on Linux and OS X.
692 693 694 695 696 |
# File 'lib/resque/worker.rb', line 692 def linux_worker_pids `ps -A -o pid,command | grep "[r]esque" | grep -v "resque-web"`.split("\n").map do |line| line.split(' ')[0] end end |
#log(message) ⇒ Object
Log a message to Resque.logger can’t use alias_method since info/debug are private methods
722 723 724 |
# File 'lib/resque/worker.rb', line 722 def log() info() end |
#log!(message) ⇒ Object
726 727 728 |
# File 'lib/resque/worker.rb', line 726 def log!() debug() end |
#logger_severity_deprecation_warning ⇒ Object
770 771 772 773 774 775 776 777 |
# File 'lib/resque/worker.rb', line 770 def logger_severity_deprecation_warning return if $TESTING return if $warned_logger_severity_deprecation Kernel.warn "*** DEPRECATION WARNING: Resque::Worker#verbose and #very_verbose are deprecated. Please set Resque.logger.level instead" Kernel.warn "Called from: #{caller[0..5].join("\n\t")}" $warned_logger_severity_deprecation = true nil end |
#new_kill_child ⇒ Object
Kills the forked child immediately with minimal remorse. The job it is processing will not be completed. Send the child a TERM signal, wait 5 seconds, and then a KILL signal if it has not quit
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 |
# File 'lib/resque/worker.rb', line 453 def new_kill_child if @child unless Process.waitpid(@child, Process::WNOHANG) log! "Sending TERM signal to child #{@child}" Process.kill("TERM", @child) (term_timeout.to_f * 10).round.times do |i| sleep(0.1) return if Process.waitpid(@child, Process::WNOHANG) end log! "Sending KILL signal to child #{@child}" Process.kill("KILL", @child) else log! "Child #{@child} already quit." end end rescue SystemCallError log! "Child #{@child} already quit and reaped." end |
#pause_processing ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one).
479 480 481 482 |
# File 'lib/resque/worker.rb', line 479 def pause_processing log "USR2 received; pausing job processing" @paused = true end |
#paused? ⇒ Boolean
are we paused?
473 474 475 |
# File 'lib/resque/worker.rb', line 473 def paused? @paused end |
#perform(job) ⇒ Object
Processes a given job in the child.
247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/resque/worker.rb', line 247 def perform(job) begin run_hook :after_fork, job if will_fork? job.perform rescue Object => e report_failed_job(job,e) else log "done: #{job.inspect}" ensure yield job if block_given? end end |
#pid ⇒ Object
Returns Integer PID of running worker
667 668 669 |
# File 'lib/resque/worker.rb', line 667 def pid @pid ||= Process.pid end |
#process(job = nil, &block) ⇒ Object
DEPRECATED. Processes a single job. If none is given, it will try to produce one. Usually run in the child.
221 222 223 224 225 226 227 228 229 |
# File 'lib/resque/worker.rb', line 221 def process(job = nil, &block) return unless job ||= reserve job.worker = self working_on job perform(job, &block) ensure done_working end |
#processed ⇒ Object
How many jobs has this worker processed? Returns an int.
584 585 586 |
# File 'lib/resque/worker.rb', line 584 def processed Stat["processed:#{self}"] end |
#processed! ⇒ Object
Tell Redis we’ve processed a job.
589 590 591 592 |
# File 'lib/resque/worker.rb', line 589 def processed! Stat << "processed" Stat << "processed:#{self}" end |
#procline(string) ⇒ Object
Given a string, sets the procline ($0) and logs. Procline is always in the format of:
resque-VERSION: STRING
715 716 717 718 |
# File 'lib/resque/worker.rb', line 715 def procline(string) $0 = "resque-#{Resque::Version}: #{string}" log! $0 end |
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefore will leave stale state information in Redis.
By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 |
# File 'lib/resque/worker.rb', line 500 def prune_dead_workers all_workers = Worker.all known_workers = worker_pids unless all_workers.empty? all_workers.each do |worker| host, pid, worker_queues_raw = worker.id.split(':') worker_queues = worker_queues_raw.split(",") unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set) # If the worker we are trying to prune does not belong to the queues # we are listening to, we should not touch it. # Attempt to prune a worker from different queues may easily result in # an unknown class exception, since that worker could easily be even # written in different language. next end next unless host == hostname next if known_workers.include?(pid) log! "Pruning dead worker: #{worker}" worker.unregister_worker end end |
#queues ⇒ Object
Returns a list of queues to use when searching for a job. A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.
299 300 301 302 303 304 305 306 307 308 |
# File 'lib/resque/worker.rb', line 299 def queues @queues.map do |queue| queue.strip! if (matched_queues = glob_match(queue)).empty? queue else matched_queues end end.flatten.uniq end |
#reconnect ⇒ Object
Reconnect to Redis to avoid sharing a connection with the parent, retry up to 3 times with increasing delay before giving up.
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
# File 'lib/resque/worker.rb', line 280 def reconnect tries = 0 begin redis.client.reconnect rescue Redis::BaseConnectionError if (tries += 1) <= 3 log "Error reconnecting to Redis; retrying" sleep(tries) retry else log "Error reconnecting to Redis; quitting" raise end end end |
#register_signal_handlers ⇒ Object
Registers the various signal handlers a worker responds to.
TERM: Shutdown immediately, stop processing jobs.
INT: Shutdown immediately, stop processing jobs.
QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked child immediately, continue processing jobs. USR2: Don’t process any new jobs CONT: Start processing jobs again after a USR2
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 |
# File 'lib/resque/worker.rb', line 368 def register_signal_handlers trap('TERM') { shutdown! } trap('INT') { shutdown! } begin trap('QUIT') { shutdown } if term_child trap('USR1') { new_kill_child } else trap('USR1') { kill_child } end trap('USR2') { pause_processing } trap('CONT') { unpause_processing } rescue ArgumentError warn "Signals QUIT, USR1, USR2, and/or CONT not supported." end log! "Registered signals" end |
#register_worker ⇒ Object
Registers ourself as a worker. Useful when entering the worker lifecycle on startup.
523 524 525 526 527 528 |
# File 'lib/resque/worker.rb', line 523 def register_worker redis.pipelined do redis.sadd(:workers, self) started! end end |
#report_failed_job(job, exception) ⇒ Object
Reports the exception and marks the job as failed
232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/resque/worker.rb', line 232 def report_failed_job(job,exception) log "#{job.inspect} failed: #{exception.inspect}" begin job.fail(exception) rescue Object => exception log "Received exception when reporting failure: #{exception.inspect}" end begin failed! rescue Object => exception log "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}" end end |
#reserve ⇒ Object
Attempts to grab a job off one of the provided queues. Returns nil if no job can be found.
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/resque/worker.rb', line 262 def reserve queues.each do |queue| log! "Checking #{queue}" if job = Resque.reserve(queue) log! "Found job on #{queue}" return job end end nil rescue Exception => e log "Error reserving job: #{e.inspect}" log e.backtrace.join("\n") raise e end |
#run_hook(name, *args) ⇒ Object
Runs a named hook, passing along any arguments.
531 532 533 534 535 536 537 538 539 540 |
# File 'lib/resque/worker.rb', line 531 def run_hook(name, *args) return unless hooks = Resque.send(name) msg = "Running #{name} hooks" msg << " with #{args.inspect}" if args.any? log msg hooks.each do |hook| args.any? ? hook.call(*args) : hook.call end end |
#shutdown ⇒ Object
Schedule this worker for shutdown. Will finish processing the current job.
407 408 409 410 |
# File 'lib/resque/worker.rb', line 407 def shutdown log 'Exiting...' @shutdown = true end |
#shutdown! ⇒ Object
Kill the child and shutdown immediately. If not forking, abort this process.
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 |
# File 'lib/resque/worker.rb', line 414 def shutdown! shutdown if term_child if fork_per_job? new_kill_child else # Raise TermException in the same process trap('TERM') do # ignore subsequent terms end raise TermException.new("SIGTERM") end else kill_child end end |
#shutdown? ⇒ Boolean
Should this worker shutdown as soon as current job is finished?
432 433 434 |
# File 'lib/resque/worker.rb', line 432 def shutdown? @shutdown end |
#solaris_worker_pids ⇒ Object
Find Resque worker pids on Solaris.
Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.
702 703 704 705 706 707 708 709 710 |
# File 'lib/resque/worker.rb', line 702 def solaris_worker_pids `ps -A -o pid,comm | grep "[r]uby" | grep -v "resque-web"`.split("\n").map do |line| real_pid = line.split(' ')[0] pargs_command = `pargs -a #{real_pid} 2>/dev/null | grep [r]esque | grep -v "resque-web"` if pargs_command.split(':')[1] == " resque-#{Resque::Version}" real_pid end end.compact end |
#started ⇒ Object
What time did this worker start? Returns an instance of ‘Time`
606 607 608 |
# File 'lib/resque/worker.rb', line 606 def started redis.get "worker:#{self}:started" end |
#started! ⇒ Object
Tell Redis we’ve started
611 612 613 |
# File 'lib/resque/worker.rb', line 611 def started! redis.set("worker:#{self}:started", Time.now.to_s) end |
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle.
339 340 341 342 343 344 345 346 347 348 349 350 |
# File 'lib/resque/worker.rb', line 339 def startup Kernel.warn "WARNING: This way of doing signal handling is now deprecated. Please see http://hone.heroku.com/resque/2012/08/21/resque-signals.html for more info." unless term_child or $TESTING enable_gc_optimizations register_signal_handlers prune_dead_workers run_hook :before_first_fork register_worker # Fix buffering so we can `rake resque:work > resque.log` and # get output from the child in there. $stdout.sync = true end |
#state ⇒ Object
Returns a symbol representing the current worker state, which can be either :working or :idle
641 642 643 |
# File 'lib/resque/worker.rb', line 641 def state redis.exists("worker:#{self}") ? :working : :idle end |
#unpause_processing ⇒ Object
Start processing jobs again after a pause
485 486 487 488 |
# File 'lib/resque/worker.rb', line 485 def unpause_processing log "CONT received; resuming job processing" @paused = false end |
#unregister_signal_handlers ⇒ Object
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
# File 'lib/resque/worker.rb', line 388 def unregister_signal_handlers trap('TERM') do trap ('TERM') do # ignore subsequent terms end raise TermException.new("SIGTERM") end trap('INT', 'DEFAULT') begin trap('QUIT', 'DEFAULT') trap('USR1', 'DEFAULT') trap('USR2', 'DEFAULT') rescue ArgumentError end end |
#unregister_worker(exception = nil) ⇒ Object
Unregisters ourself as a worker. Useful when shutting down.
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 |
# File 'lib/resque/worker.rb', line 543 def unregister_worker(exception = nil) # If we're still processing a job, make sure it gets logged as a # failure. if (hash = processing) && !hash.empty? job = Job.new(hash['queue'], hash['payload']) # Ensure the proper worker is attached to this job, even if # it's not the precise instance that died. job.worker = self job.fail(exception || DirtyExit.new) end redis.pipelined do redis.srem(:workers, self) redis.del("worker:#{self}") redis.del("worker:#{self}:started") Stat.clear("processed:#{self}") Stat.clear("failed:#{self}") end end |
#validate_queues ⇒ Object
A worker must be given a queue, otherwise it won’t know what to do with itself.
You probably never need to call this.
139 140 141 142 143 |
# File 'lib/resque/worker.rb', line 139 def validate_queues if @queues.nil? || @queues.empty? raise NoQueueError.new("Please give each worker at least one queue.") end end |
#verbose ⇒ Object
735 736 737 738 |
# File 'lib/resque/worker.rb', line 735 def verbose logger_severity_deprecation_warning @verbose end |
#verbose=(value) ⇒ Object
745 746 747 748 749 750 751 752 753 754 755 |
# File 'lib/resque/worker.rb', line 745 def verbose=(value); logger_severity_deprecation_warning if value && !very_verbose Resque.logger.formatter = VerboseFormatter.new elsif !value Resque.logger.formatter = QuietFormatter.new end @verbose = value end |
#very_verbose ⇒ Object
740 741 742 743 |
# File 'lib/resque/worker.rb', line 740 def very_verbose logger_severity_deprecation_warning @very_verbose end |
#very_verbose=(value) ⇒ Object
757 758 759 760 761 762 763 764 765 766 767 768 |
# File 'lib/resque/worker.rb', line 757 def very_verbose=(value) logger_severity_deprecation_warning if value Resque.logger.formatter = VeryVerboseFormatter.new elsif !value && verbose Resque.logger.formatter = VerboseFormatter.new else Resque.logger.formatter = QuietFormatter.new end @very_verbose = value end |
#will_fork? ⇒ Boolean
631 632 633 |
# File 'lib/resque/worker.rb', line 631 def will_fork? !@cant_fork && !$TESTING && fork_per_job? end |
#windows_worker_pids ⇒ Object
Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.
685 686 687 688 |
# File 'lib/resque/worker.rb', line 685 def windows_worker_pids tasklist_output = `tasklist /FI "IMAGENAME eq ruby.exe" /FO list`.encode("UTF-8", Encoding.locale_charmap) tasklist_output.split($/).select { |line| line =~ /^PID:/}.collect{ |line| line.gsub /PID:\s+/, '' } end |
#work(interval = 5.0, &block) ⇒ Object
This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.
The following events occur during a worker’s life cycle:
-
Startup: Signals are registered, dead workers are pruned,
and this worker is registered.
-
Work loop: Jobs are pulled from a queue and processed.
-
Teardown: This worker is unregistered.
Can be passed a float representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.
Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/resque/worker.rb', line 161 def work(interval = 5.0, &block) interval = Float(interval) $0 = "resque: Starting" startup loop do break if shutdown? if not paused? and job = reserve log "got: #{job.inspect}" job.worker = self working_on job procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]" if @child = fork(job) srand # Reseeding procline "Forked #{@child} at #{Time.now.to_i}" begin Process.waitpid(@child) rescue SystemCallError nil end job.fail(DirtyExit.new($?.to_s)) if $?.signaled? else unregister_signal_handlers if will_fork? && term_child begin reconnect perform(job, &block) rescue Exception => exception report_failed_job(job,exception) end if will_fork? run_at_exit_hooks ? exit : exit! end end done_working @child = nil else break if interval.zero? log! "Sleeping for #{interval} seconds" procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}" sleep interval end end unregister_worker rescue Exception => exception unless exception.class == SystemExit && !@child && run_at_exit_hooks log "Failed to start worker : #{exception.inspect}" unregister_worker(exception) end end |
#worker_pids ⇒ Object
Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.
673 674 675 676 677 678 679 680 681 |
# File 'lib/resque/worker.rb', line 673 def worker_pids if RUBY_PLATFORM =~ /solaris/ solaris_worker_pids elsif RUBY_PLATFORM =~ /mingw32/ windows_worker_pids else linux_worker_pids end end |
#working? ⇒ Boolean
Boolean - true if working, false if not
622 623 624 |
# File 'lib/resque/worker.rb', line 622 def working? state == :working end |
#working_on(job) ⇒ Object
Given a job, tells Redis we’re working on it. Useful for seeing what workers are doing and when.
566 567 568 569 570 571 572 |
# File 'lib/resque/worker.rb', line 566 def working_on(job) data = encode \ :queue => job.queue, :run_at => Time.now.utc.iso8601, :payload => job.payload redis.set("worker:#{self}", data) end |