Class: Resque::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/resque/worker.rb

Overview

A Resque Worker processes jobs. On platforms that support fork(2), the worker will fork off a child to process each job. This ensures a clean slate when beginning the next job and cuts down on gradual memory growth as well as low level failures.

It also ensures workers are always listening to signals from you, their master, and can react accordingly.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

debug, error, fatal, info, log, warn

Constructor Details

#initialize(*queues) ⇒ Worker

Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.

If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.



128
129
130
131
132
133
# File 'lib/resque/worker.rb', line 128

def initialize(*queues)
  @queues = queues.map { |queue| queue.to_s.strip }
  @shutdown = nil
  @paused = nil
  validate_queues
end

Instance Attribute Details

#cant_forkObject

Boolean indicating whether this worker can or can not fork. Automatically set if a fork(2) fails.



50
51
52
# File 'lib/resque/worker.rb', line 50

def cant_fork
  @cant_fork
end

#run_at_exit_hooksObject

When set to true, forked workers will exit with ‘exit`, calling any `at_exit` code handlers that have been registered in the application. Otherwise, forked workers exit with `exit!`



59
60
61
# File 'lib/resque/worker.rb', line 59

def run_at_exit_hooks
  @run_at_exit_hooks
end

#term_childObject

decide whether to use new_kill_child logic



55
56
57
# File 'lib/resque/worker.rb', line 55

def term_child
  @term_child
end

#term_timeoutObject

Returns the value of attribute term_timeout.



52
53
54
# File 'lib/resque/worker.rb', line 52

def term_timeout
  @term_timeout
end

#to_sObject Also known as: id

The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.



656
657
658
# File 'lib/resque/worker.rb', line 656

def to_s
  @to_s ||= "#{hostname}:#{pid}:#{@queues.join(',')}"
end

Class Method Details

.allObject

Returns an array of all worker objects.



64
65
66
# File 'lib/resque/worker.rb', line 64

def self.all
  Array(redis.smembers(:workers)).map { |id| find(id) }.compact
end

.attach(worker_id) ⇒ Object

Alias of ‘find`



107
108
109
# File 'lib/resque/worker.rb', line 107

def self.attach(worker_id)
  find(worker_id)
end

.exists?(worker_id) ⇒ Boolean

Given a string worker id, return a boolean indicating whether the worker exists

Returns:

  • (Boolean)


113
114
115
# File 'lib/resque/worker.rb', line 113

def self.exists?(worker_id)
  redis.sismember(:workers, worker_id)
end

.find(worker_id) ⇒ Object

Returns a single worker object. Accepts a string id.



95
96
97
98
99
100
101
102
103
104
# File 'lib/resque/worker.rb', line 95

def self.find(worker_id)
  if exists? worker_id
    queues = worker_id.split(':')[-1].split(',')
    worker = new(*queues)
    worker.to_s = worker_id
    worker
  else
    nil
  end
end

.redisObject



19
20
21
# File 'lib/resque/worker.rb', line 19

def self.redis
  Resque.redis
end

.workingObject

Returns an array of all worker objects currently processing jobs.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/resque/worker.rb', line 70

def self.working
  names = all
  return [] unless names.any?

  names.map! { |name| "worker:#{name}" }

  reportedly_working = {}

  begin
    reportedly_working = redis.mapped_mget(*names).reject do |key, value|
      value.nil? || value.empty?
    end
  rescue Redis::Distributed::CannotDistribute
    names.each do |name|
      value = redis.get name
      reportedly_working[name] = value unless value.nil? || value.empty?
    end
  end

  reportedly_working.keys.map do |key|
    find key.sub("worker:", '')
  end.compact
end

Instance Method Details

#==(other) ⇒ Object

Is this worker the same as another worker?



646
647
648
# File 'lib/resque/worker.rb', line 646

def ==(other)
  to_s == other.to_s
end

#decode(object) ⇒ Object

Given a string, returns a Ruby object.



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/resque/worker.rb', line 34

def decode(object)
  return unless object

  begin
    if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
      MultiJson.load object
    else
      MultiJson.decode object
    end
  rescue ::MultiJson::DecodeError => e
    raise DecodeException, e.message, e.backtrace
  end
end

#done_workingObject

Called when we are done working - clears our ‘working_on` state and tells Redis we processed a job.



576
577
578
579
580
581
# File 'lib/resque/worker.rb', line 576

def done_working
  redis.pipelined do
    processed!
    redis.del("worker:#{self}")
  end
end

#enable_gc_optimizationsObject

Enables GC Optimizations if you’re running REE. www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow



354
355
356
357
358
# File 'lib/resque/worker.rb', line 354

def enable_gc_optimizations
  if GC.respond_to?(:copy_on_write_friendly=)
    GC.copy_on_write_friendly = true
  end
end

#encode(object) ⇒ Object

Given a Ruby object, returns a string suitable for storage in a queue.



25
26
27
28
29
30
31
# File 'lib/resque/worker.rb', line 25

def encode(object)
  if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
    MultiJson.dump object
  else
    MultiJson.encode object
  end
end

#failedObject

How many failed jobs has this worker seen? Returns an int.



595
596
597
# File 'lib/resque/worker.rb', line 595

def failed
  Stat["failed:#{self}"]
end

#failed!Object

Tells Redis we’ve failed a job.



600
601
602
603
# File 'lib/resque/worker.rb', line 600

def failed!
  Stat << "failed"
  Stat << "failed:#{self}"
end

#fork(job) ⇒ Object

Not every platform supports fork. Here we do our magic to determine if yours does.



318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/resque/worker.rb', line 318

def fork(job)
  return if @cant_fork

  # Only run before_fork hooks if we're actually going to fork
  # (after checking @cant_fork)
  run_hook :before_fork, job

  begin
    # IronRuby doesn't support `Kernel.fork` yet
    if Kernel.respond_to?(:fork)
      Kernel.fork if will_fork?
    else
      raise NotImplementedError
    end
  rescue NotImplementedError
    @cant_fork = true
    nil
  end
end

#fork_per_job?Boolean

Returns:

  • (Boolean)


635
636
637
# File 'lib/resque/worker.rb', line 635

def fork_per_job?
  ENV["FORK_PER_JOB"] != 'false'
end

#glob_match(pattern) ⇒ Object



310
311
312
313
314
# File 'lib/resque/worker.rb', line 310

def glob_match(pattern)
  Resque.queues.select do |queue|
    File.fnmatch?(pattern, queue)
  end.sort
end

#hostnameObject

chomp’d hostname of this machine



662
663
664
# File 'lib/resque/worker.rb', line 662

def hostname
  Socket.gethostname
end

#idle?Boolean

Boolean - true if idle, false if not

Returns:

  • (Boolean)


627
628
629
# File 'lib/resque/worker.rb', line 627

def idle?
  state == :idle
end

#inspectObject



650
651
652
# File 'lib/resque/worker.rb', line 650

def inspect
  "#<Worker #{to_s}>"
end

#jobObject Also known as: processing

Returns a hash explaining the Job we’re currently processing, if any.



616
617
618
# File 'lib/resque/worker.rb', line 616

def job
  decode(redis.get("worker:#{self}")) || {}
end

#kill_childObject

Kills the forked child immediately, without remorse. The job it is processing will not be completed.



438
439
440
441
442
443
444
445
446
447
448
# File 'lib/resque/worker.rb', line 438

def kill_child
  if @child
    log! "Killing child at #{@child}"
    if `ps -o pid,state -p #{@child}`
      Process.kill("KILL", @child) rescue nil
    else
      log! "Child #{@child} not found, restarting."
      shutdown
    end
  end
end

#linux_worker_pidsObject

Find Resque worker pids on Linux and OS X.



692
693
694
695
696
# File 'lib/resque/worker.rb', line 692

def linux_worker_pids
  `ps -A -o pid,command | grep "[r]esque" | grep -v "resque-web"`.split("\n").map do |line|
    line.split(' ')[0]
  end
end

#log(message) ⇒ Object

Log a message to Resque.logger can’t use alias_method since info/debug are private methods



722
723
724
# File 'lib/resque/worker.rb', line 722

def log(message)
  info(message)
end

#log!(message) ⇒ Object



726
727
728
# File 'lib/resque/worker.rb', line 726

def log!(message)
  debug(message)
end

#logger_severity_deprecation_warningObject



770
771
772
773
774
775
776
777
# File 'lib/resque/worker.rb', line 770

def logger_severity_deprecation_warning
  return if $TESTING
  return if $warned_logger_severity_deprecation
  Kernel.warn "*** DEPRECATION WARNING: Resque::Worker#verbose and #very_verbose are deprecated. Please set Resque.logger.level instead"
  Kernel.warn "Called from: #{caller[0..5].join("\n\t")}"
  $warned_logger_severity_deprecation = true
  nil
end

#new_kill_childObject

Kills the forked child immediately with minimal remorse. The job it is processing will not be completed. Send the child a TERM signal, wait 5 seconds, and then a KILL signal if it has not quit



453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
# File 'lib/resque/worker.rb', line 453

def new_kill_child
  if @child
    unless Process.waitpid(@child, Process::WNOHANG)
      log! "Sending TERM signal to child #{@child}"
      Process.kill("TERM", @child)
      (term_timeout.to_f * 10).round.times do |i|
        sleep(0.1)
        return if Process.waitpid(@child, Process::WNOHANG)
      end
      log! "Sending KILL signal to child #{@child}"
      Process.kill("KILL", @child)
    else
      log! "Child #{@child} already quit."
    end
  end
rescue SystemCallError
  log! "Child #{@child} already quit and reaped."
end

#pause_processingObject

Stop processing jobs after the current one has completed (if we’re currently running one).



479
480
481
482
# File 'lib/resque/worker.rb', line 479

def pause_processing
  log "USR2 received; pausing job processing"
  @paused = true
end

#paused?Boolean

are we paused?

Returns:

  • (Boolean)


473
474
475
# File 'lib/resque/worker.rb', line 473

def paused?
  @paused
end

#perform(job) ⇒ Object

Processes a given job in the child.



247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/resque/worker.rb', line 247

def perform(job)
  begin
    run_hook :after_fork, job if will_fork?
    job.perform
  rescue Object => e
    report_failed_job(job,e)
  else
    log "done: #{job.inspect}"
  ensure
    yield job if block_given?
  end
end

#pidObject

Returns Integer PID of running worker



667
668
669
# File 'lib/resque/worker.rb', line 667

def pid
  @pid ||= Process.pid
end

#process(job = nil, &block) ⇒ Object

DEPRECATED. Processes a single job. If none is given, it will try to produce one. Usually run in the child.



221
222
223
224
225
226
227
228
229
# File 'lib/resque/worker.rb', line 221

def process(job = nil, &block)
  return unless job ||= reserve

  job.worker = self
  working_on job
  perform(job, &block)
ensure
  done_working
end

#processedObject

How many jobs has this worker processed? Returns an int.



584
585
586
# File 'lib/resque/worker.rb', line 584

def processed
  Stat["processed:#{self}"]
end

#processed!Object

Tell Redis we’ve processed a job.



589
590
591
592
# File 'lib/resque/worker.rb', line 589

def processed!
  Stat << "processed"
  Stat << "processed:#{self}"
end

#procline(string) ⇒ Object

Given a string, sets the procline ($0) and logs. Procline is always in the format of:

resque-VERSION: STRING


715
716
717
718
# File 'lib/resque/worker.rb', line 715

def procline(string)
  $0 = "resque-#{Resque::Version}: #{string}"
  log! $0
end

#prune_dead_workersObject

Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.

This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefore will leave stale state information in Redis.

By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.



500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
# File 'lib/resque/worker.rb', line 500

def prune_dead_workers
  all_workers = Worker.all
  known_workers = worker_pids unless all_workers.empty?
  all_workers.each do |worker|
    host, pid, worker_queues_raw = worker.id.split(':')
    worker_queues = worker_queues_raw.split(",")
    unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set)
      # If the worker we are trying to prune does not belong to the queues
      # we are listening to, we should not touch it. 
      # Attempt to prune a worker from different queues may easily result in
      # an unknown class exception, since that worker could easily be even 
      # written in different language.
      next
    end
    next unless host == hostname
    next if known_workers.include?(pid)
    log! "Pruning dead worker: #{worker}"
    worker.unregister_worker
  end
end

#queuesObject

Returns a list of queues to use when searching for a job. A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.



299
300
301
302
303
304
305
306
307
308
# File 'lib/resque/worker.rb', line 299

def queues
  @queues.map do |queue|
    queue.strip!
    if (matched_queues = glob_match(queue)).empty?
      queue
    else
      matched_queues
    end
  end.flatten.uniq
end

#reconnectObject

Reconnect to Redis to avoid sharing a connection with the parent, retry up to 3 times with increasing delay before giving up.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/resque/worker.rb', line 280

def reconnect
  tries = 0
  begin
    redis.client.reconnect
  rescue Redis::BaseConnectionError
    if (tries += 1) <= 3
      log "Error reconnecting to Redis; retrying"
      sleep(tries)
      retry
    else
      log "Error reconnecting to Redis; quitting"
      raise
    end
  end
end

#redisObject



15
16
17
# File 'lib/resque/worker.rb', line 15

def redis
  Resque.redis
end

#register_signal_handlersObject

Registers the various signal handlers a worker responds to.

TERM: Shutdown immediately, stop processing jobs.

INT: Shutdown immediately, stop processing jobs.

QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked child immediately, continue processing jobs. USR2: Don’t process any new jobs CONT: Start processing jobs again after a USR2



368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# File 'lib/resque/worker.rb', line 368

def register_signal_handlers
  trap('TERM') { shutdown!  }
  trap('INT')  { shutdown!  }

  begin
    trap('QUIT') { shutdown   }
    if term_child
      trap('USR1') { new_kill_child }
    else
      trap('USR1') { kill_child }
    end
    trap('USR2') { pause_processing }
    trap('CONT') { unpause_processing }
  rescue ArgumentError
    warn "Signals QUIT, USR1, USR2, and/or CONT not supported."
  end

  log! "Registered signals"
end

#register_workerObject

Registers ourself as a worker. Useful when entering the worker lifecycle on startup.



523
524
525
526
527
528
# File 'lib/resque/worker.rb', line 523

def register_worker
  redis.pipelined do
    redis.sadd(:workers, self)
    started!
  end
end

#report_failed_job(job, exception) ⇒ Object

Reports the exception and marks the job as failed



232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/resque/worker.rb', line 232

def report_failed_job(job,exception)
  log "#{job.inspect} failed: #{exception.inspect}"
  begin
    job.fail(exception)
  rescue Object => exception
    log "Received exception when reporting failure: #{exception.inspect}"
  end
  begin
    failed!
  rescue Object => exception
    log "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}"
  end
end

#reserveObject

Attempts to grab a job off one of the provided queues. Returns nil if no job can be found.



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/resque/worker.rb', line 262

def reserve
  queues.each do |queue|
    log! "Checking #{queue}"
    if job = Resque.reserve(queue)
      log! "Found job on #{queue}"
      return job
    end
  end

  nil
rescue Exception => e
  log "Error reserving job: #{e.inspect}"
  log e.backtrace.join("\n")
  raise e
end

#run_hook(name, *args) ⇒ Object

Runs a named hook, passing along any arguments.



531
532
533
534
535
536
537
538
539
540
# File 'lib/resque/worker.rb', line 531

def run_hook(name, *args)
  return unless hooks = Resque.send(name)
  msg = "Running #{name} hooks"
  msg << " with #{args.inspect}" if args.any?
  log msg

  hooks.each do |hook|
    args.any? ? hook.call(*args) : hook.call
  end
end

#shutdownObject

Schedule this worker for shutdown. Will finish processing the current job.



407
408
409
410
# File 'lib/resque/worker.rb', line 407

def shutdown
  log 'Exiting...'
  @shutdown = true
end

#shutdown!Object

Kill the child and shutdown immediately. If not forking, abort this process.



414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'lib/resque/worker.rb', line 414

def shutdown!
  shutdown
  if term_child
    if fork_per_job?
      new_kill_child
    else
      # Raise TermException in the same process
      trap('TERM') do
        # ignore subsequent terms
      end
      raise TermException.new("SIGTERM")
    end
  else
    kill_child
  end
end

#shutdown?Boolean

Should this worker shutdown as soon as current job is finished?

Returns:

  • (Boolean)


432
433
434
# File 'lib/resque/worker.rb', line 432

def shutdown?
  @shutdown
end

#solaris_worker_pidsObject

Find Resque worker pids on Solaris.

Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.



702
703
704
705
706
707
708
709
710
# File 'lib/resque/worker.rb', line 702

def solaris_worker_pids
  `ps -A -o pid,comm | grep "[r]uby" | grep -v "resque-web"`.split("\n").map do |line|
    real_pid = line.split(' ')[0]
    pargs_command = `pargs -a #{real_pid} 2>/dev/null | grep [r]esque | grep -v "resque-web"`
    if pargs_command.split(':')[1] == " resque-#{Resque::Version}"
      real_pid
    end
  end.compact
end

#startedObject

What time did this worker start? Returns an instance of ‘Time`



606
607
608
# File 'lib/resque/worker.rb', line 606

def started
  redis.get "worker:#{self}:started"
end

#started!Object

Tell Redis we’ve started



611
612
613
# File 'lib/resque/worker.rb', line 611

def started!
  redis.set("worker:#{self}:started", Time.now.to_s)
end

#startupObject

Runs all the methods needed when a worker begins its lifecycle.



339
340
341
342
343
344
345
346
347
348
349
350
# File 'lib/resque/worker.rb', line 339

def startup
  Kernel.warn "WARNING: This way of doing signal handling is now deprecated. Please see http://hone.heroku.com/resque/2012/08/21/resque-signals.html for more info." unless term_child or $TESTING
  enable_gc_optimizations
  register_signal_handlers
  prune_dead_workers
  run_hook :before_first_fork
  register_worker

  # Fix buffering so we can `rake resque:work > resque.log` and
  # get output from the child in there.
  $stdout.sync = true
end

#stateObject

Returns a symbol representing the current worker state, which can be either :working or :idle



641
642
643
# File 'lib/resque/worker.rb', line 641

def state
  redis.exists("worker:#{self}") ? :working : :idle
end

#unpause_processingObject

Start processing jobs again after a pause



485
486
487
488
# File 'lib/resque/worker.rb', line 485

def unpause_processing
  log "CONT received; resuming job processing"
  @paused = false
end

#unregister_signal_handlersObject



388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'lib/resque/worker.rb', line 388

def unregister_signal_handlers
  trap('TERM') do
    trap ('TERM') do 
      # ignore subsequent terms               
    end  
    raise TermException.new("SIGTERM") 
  end 
  trap('INT', 'DEFAULT')

  begin
    trap('QUIT', 'DEFAULT')
    trap('USR1', 'DEFAULT')
    trap('USR2', 'DEFAULT')
  rescue ArgumentError
  end
end

#unregister_worker(exception = nil) ⇒ Object

Unregisters ourself as a worker. Useful when shutting down.



543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
# File 'lib/resque/worker.rb', line 543

def unregister_worker(exception = nil)
  # If we're still processing a job, make sure it gets logged as a
  # failure.
  if (hash = processing) && !hash.empty?
    job = Job.new(hash['queue'], hash['payload'])
    # Ensure the proper worker is attached to this job, even if
    # it's not the precise instance that died.
    job.worker = self
    job.fail(exception || DirtyExit.new)
  end

  redis.pipelined do
    redis.srem(:workers, self)
    redis.del("worker:#{self}")
    redis.del("worker:#{self}:started")

    Stat.clear("processed:#{self}")
    Stat.clear("failed:#{self}")
  end
end

#validate_queuesObject

A worker must be given a queue, otherwise it won’t know what to do with itself.

You probably never need to call this.



139
140
141
142
143
# File 'lib/resque/worker.rb', line 139

def validate_queues
  if @queues.nil? || @queues.empty?
    raise NoQueueError.new("Please give each worker at least one queue.")
  end
end

#verboseObject

Deprecated legacy methods for controlling the logging threshhold Use Resque.logger.level now, e.g.:

Resque.logger.level = Logger::DEBUG


735
736
737
738
# File 'lib/resque/worker.rb', line 735

def verbose
  logger_severity_deprecation_warning
  @verbose
end

#verbose=(value) ⇒ Object



745
746
747
748
749
750
751
752
753
754
755
# File 'lib/resque/worker.rb', line 745

def verbose=(value);
  logger_severity_deprecation_warning

  if value && !very_verbose
    Resque.logger.formatter = VerboseFormatter.new
  elsif !value
    Resque.logger.formatter = QuietFormatter.new
  end

  @verbose = value
end

#very_verboseObject



740
741
742
743
# File 'lib/resque/worker.rb', line 740

def very_verbose
  logger_severity_deprecation_warning
  @very_verbose
end

#very_verbose=(value) ⇒ Object



757
758
759
760
761
762
763
764
765
766
767
768
# File 'lib/resque/worker.rb', line 757

def very_verbose=(value)
  logger_severity_deprecation_warning
  if value
    Resque.logger.formatter = VeryVerboseFormatter.new
  elsif !value && verbose
    Resque.logger.formatter = VerboseFormatter.new
  else
    Resque.logger.formatter = QuietFormatter.new
  end

  @very_verbose = value
end

#will_fork?Boolean

Returns:

  • (Boolean)


631
632
633
# File 'lib/resque/worker.rb', line 631

def will_fork?
  !@cant_fork && !$TESTING && fork_per_job?
end

#windows_worker_pidsObject

Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.



685
686
687
688
# File 'lib/resque/worker.rb', line 685

def windows_worker_pids
  tasklist_output = `tasklist /FI "IMAGENAME eq ruby.exe" /FO list`.encode("UTF-8", Encoding.locale_charmap)
  tasklist_output.split($/).select { |line| line =~ /^PID:/}.collect{ |line| line.gsub /PID:\s+/, '' }
end

#work(interval = 5.0, &block) ⇒ Object

This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.

The following events occur during a worker’s life cycle:

  1. Startup: Signals are registered, dead workers are pruned,

    and this worker is registered.
    
  2. Work loop: Jobs are pulled from a queue and processed.

  3. Teardown: This worker is unregistered.

Can be passed a float representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.

Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing.



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/resque/worker.rb', line 161

def work(interval = 5.0, &block)
  interval = Float(interval)
  $0 = "resque: Starting"
  startup

  loop do
    break if shutdown?

    if not paused? and job = reserve
      log "got: #{job.inspect}"
      job.worker = self
      working_on job

      procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]"
      if @child = fork(job)
        srand # Reseeding
        procline "Forked #{@child} at #{Time.now.to_i}"
        begin
          Process.waitpid(@child)
        rescue SystemCallError
          nil
        end
        job.fail(DirtyExit.new($?.to_s)) if $?.signaled?
      else
        unregister_signal_handlers if will_fork? && term_child
        begin

          reconnect
          perform(job, &block)

        rescue Exception => exception
          report_failed_job(job,exception)
        end

        if will_fork?
          run_at_exit_hooks ? exit : exit!
        end
      end

      done_working
      @child = nil
    else
      break if interval.zero?
      log! "Sleeping for #{interval} seconds"
      procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
      sleep interval
    end
  end

  unregister_worker
rescue Exception => exception
  unless exception.class == SystemExit && !@child && run_at_exit_hooks
    log "Failed to start worker : #{exception.inspect}"

    unregister_worker(exception)
  end
end

#worker_pidsObject

Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.



673
674
675
676
677
678
679
680
681
# File 'lib/resque/worker.rb', line 673

def worker_pids
  if RUBY_PLATFORM =~ /solaris/
    solaris_worker_pids
  elsif RUBY_PLATFORM =~ /mingw32/
    windows_worker_pids
  else
    linux_worker_pids
  end
end

#working?Boolean

Boolean - true if working, false if not

Returns:

  • (Boolean)


622
623
624
# File 'lib/resque/worker.rb', line 622

def working?
  state == :working
end

#working_on(job) ⇒ Object

Given a job, tells Redis we’re working on it. Useful for seeing what workers are doing and when.



566
567
568
569
570
571
572
# File 'lib/resque/worker.rb', line 566

def working_on(job)
  data = encode \
    :queue   => job.queue,
    :run_at  => Time.now.utc.iso8601,
    :payload => job.payload
  redis.set("worker:#{self}", data)
end