Class: Resque::Worker

Inherits:
Object
  • Object
show all
Extended by:
Helpers
Includes:
Helpers, Logging
Defined in:
lib/resque/worker.rb

Overview

A Resque Worker processes jobs. On platforms that support fork(2), the worker will fork off a child to process each job. This ensures a clean slate when beginning the next job and cuts down on gradual memory growth as well as low level failures.

It also ensures workers are always listening to signals from you, their master, and can react accordingly.

Constant Summary collapse

WORKER_HEARTBEAT_KEY =
"workers:heartbeat"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers

classify, constantize

Methods included from Logging

debug, error, fatal, info, log, warn

Constructor Details

#initialize(*queues) ⇒ Worker

Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.

If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/resque/worker.rb', line 125

def initialize(*queues)
  @shutdown = nil
  @paused = nil
  @before_first_fork_hook_ran = false

  self.verbose = ENV['LOGGING'] || ENV['VERBOSE']
  self.very_verbose = ENV['VVERBOSE']
  self.term_timeout = ENV['RESQUE_TERM_TIMEOUT'] || 4.0
  self.term_child = ENV['TERM_CHILD']
  self.graceful_term = ENV['GRACEFUL_TERM']
  self.run_at_exit_hooks = ENV['RUN_AT_EXIT_HOOKS']

  if ENV['BACKGROUND']
    unless Process.respond_to?('daemon')
        abort "env var BACKGROUND is set, which requires ruby >= 1.9"
    end
    Process.daemon(true)
    self.reconnect
  end

  if ENV['PIDFILE']
    File.open(ENV['PIDFILE'], 'w') { |f| f << pid }
  end

  self.queues = queues
end

Instance Attribute Details

#graceful_termObject

should term kill workers gracefully (vs. immediately) Makes SIGTERM work like SIGQUIT



45
46
47
# File 'lib/resque/worker.rb', line 45

def graceful_term
  @graceful_term
end

#job(reload = true) ⇒ Object Also known as: processing

Returns a hash explaining the Job we’re currently processing, if any.



721
722
723
724
# File 'lib/resque/worker.rb', line 721

def job(reload = true)
  @job = nil if reload
  @job ||= decode(redis.get("worker:#{self}")) || {}
end

#pidObject

Returns Integer PID of running worker



774
775
776
# File 'lib/resque/worker.rb', line 774

def pid
  @pid ||= Process.pid
end

#run_at_exit_hooksObject

When set to true, forked workers will exit with ‘exit`, calling any `at_exit` code handlers that have been registered in the application. Otherwise, forked workers exit with `exit!`



49
50
51
# File 'lib/resque/worker.rb', line 49

def run_at_exit_hooks
  @run_at_exit_hooks
end

#term_childObject

decide whether to use new_kill_child logic



41
42
43
# File 'lib/resque/worker.rb', line 41

def term_child
  @term_child
end

#term_timeoutObject

Returns the value of attribute term_timeout.



38
39
40
# File 'lib/resque/worker.rb', line 38

def term_timeout
  @term_timeout
end

#to_sObject Also known as: id

The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.



763
764
765
# File 'lib/resque/worker.rb', line 763

def to_s
  @to_s ||= "#{hostname}:#{pid}:#{@queues.join(',')}"
end

Class Method Details

.allObject

Returns an array of all worker objects.



55
56
57
# File 'lib/resque/worker.rb', line 55

def self.all
  Array(redis.smembers(:workers)).map { |id| find(id, :skip_exists => true) }.compact
end

.all_heartbeatsObject



475
476
477
# File 'lib/resque/worker.rb', line 475

def self.all_heartbeats
  redis.hgetall(WORKER_HEARTBEAT_KEY)
end

.all_workers_with_expired_heartbeatsObject

Returns a list of workers that have sent a heartbeat in the past, but which already expired (does NOT include workers that have never sent a heartbeat at all).



481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
# File 'lib/resque/worker.rb', line 481

def self.all_workers_with_expired_heartbeats
  workers = Worker.all
  heartbeats = Worker.all_heartbeats

  workers.select do |worker|
    id = worker.to_s
    heartbeat = heartbeats[id]

    if heartbeat
      seconds_since_heartbeat = (Time.now - Time.parse(heartbeat)).to_i
      seconds_since_heartbeat > Resque.prune_interval
    else
      false
    end
  end
end

.attach(worker_id) ⇒ Object

Alias of ‘find`



104
105
106
# File 'lib/resque/worker.rb', line 104

def self.attach(worker_id)
  find(worker_id)
end

.exists?(worker_id) ⇒ Boolean

Given a string worker id, return a boolean indicating whether the worker exists

Returns:

  • (Boolean)


110
111
112
# File 'lib/resque/worker.rb', line 110

def self.exists?(worker_id)
  redis.sismember(:workers, worker_id)
end

.find(worker_id, options = {}) ⇒ Object

Returns a single worker object. Accepts a string id.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/resque/worker.rb', line 88

def self.find(worker_id, options = {})
  skip_exists = options[:skip_exists]

  if skip_exists || exists?(worker_id)
    host, pid, queues_raw = worker_id.split(':')
    queues = queues_raw.split(',')
    worker = new(*queues)
    worker.to_s = worker_id
    worker.pid = pid.to_i
    worker
  else
    nil
  end
end

.redisObject



23
24
25
# File 'lib/resque/worker.rb', line 23

def self.redis
  Resque.redis
end

.workingObject

Returns an array of all worker objects currently processing jobs.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/resque/worker.rb', line 61

def self.working
  names = all
  return [] unless names.any?

  names.map! { |name| "worker:#{name}" }

  reportedly_working = {}

  begin
    reportedly_working = redis.mapped_mget(*names).reject do |key, value|
      value.nil? || value.empty?
    end
  rescue Redis::Distributed::CannotDistribute
    names.each do |name|
      value = redis.get name
      reportedly_working[name] = value unless value.nil? || value.empty?
    end
  end

  reportedly_working.keys.map do |key|
    worker = find(key.sub("worker:", ''), :skip_exists => true)
    worker.job = worker.decode(reportedly_working[key])
    worker
  end.compact
end

Instance Method Details

#==(other) ⇒ Object

Is this worker the same as another worker?



753
754
755
# File 'lib/resque/worker.rb', line 753

def ==(other)
  to_s == other.to_s
end

#decode(object) ⇒ Object

Given a string, returns a Ruby object.



34
35
36
# File 'lib/resque/worker.rb', line 34

def decode(object)
  Resque.decode(object)
end

#done_workingObject

Called when we are done working - clears our ‘working_on` state and tells Redis we processed a job.



681
682
683
684
685
686
# File 'lib/resque/worker.rb', line 681

def done_working
  redis.pipelined do
    processed!
    redis.del("worker:#{self}")
  end
end

#enable_gc_optimizationsObject

Enables GC Optimizations if you’re running REE. www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow



374
375
376
377
378
# File 'lib/resque/worker.rb', line 374

def enable_gc_optimizations
  if GC.respond_to?(:copy_on_write_friendly=)
    GC.copy_on_write_friendly = true
  end
end

#encode(object) ⇒ Object

Given a Ruby object, returns a string suitable for storage in a queue.



29
30
31
# File 'lib/resque/worker.rb', line 29

def encode(object)
  Resque.encode(object)
end

#failedObject

How many failed jobs has this worker seen? Returns an int.



700
701
702
# File 'lib/resque/worker.rb', line 700

def failed
  Stat["failed:#{self}"]
end

#failed!Object

Tells Redis we’ve failed a job.



705
706
707
708
# File 'lib/resque/worker.rb', line 705

def failed!
  Stat << "failed"
  Stat << "failed:#{self}"
end

#fork(job) ⇒ Object

Not every platform supports fork. Here we do our magic to determine if yours does.



338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/resque/worker.rb', line 338

def fork(job)
  return unless will_fork?

  # Only run before_fork hooks if we're actually going to fork
  # (after checking will_fork?)
  run_hook :before_fork, job

  begin
    # IronRuby doesn't support `Kernel.fork` yet
    if Kernel.respond_to?(:fork)
      Kernel.fork
    else
      raise NotImplementedError
    end
  rescue NotImplementedError
    @cant_fork = true
    nil
  end
end

#fork_per_job?Boolean

Returns:

  • (Boolean)


742
743
744
# File 'lib/resque/worker.rb', line 742

def fork_per_job?
  ENV["FORK_PER_JOB"] != 'false'
end

#glob_match(pattern) ⇒ Object



179
180
181
182
183
# File 'lib/resque/worker.rb', line 179

def glob_match(pattern)
  Resque.queues.select do |queue|
    File.fnmatch?(pattern, queue)
  end.sort
end

#heartbeatObject



470
471
472
473
# File 'lib/resque/worker.rb', line 470

def heartbeat
  heartbeat = redis.hget(WORKER_HEARTBEAT_KEY, to_s)
  heartbeat && Time.parse(heartbeat)
end

#heartbeat!(time = Time.now) ⇒ Object



498
499
500
# File 'lib/resque/worker.rb', line 498

def heartbeat!(time = Time.now)
  redis.hset(WORKER_HEARTBEAT_KEY, to_s, time.iso8601)
end

#hostnameObject

chomp’d hostname of this machine



769
770
771
# File 'lib/resque/worker.rb', line 769

def hostname
  Socket.gethostname
end

#idle?Boolean

Boolean - true if idle, false if not

Returns:

  • (Boolean)


734
735
736
# File 'lib/resque/worker.rb', line 734

def idle?
  state == :idle
end

#inspectObject



757
758
759
# File 'lib/resque/worker.rb', line 757

def inspect
  "#<Worker #{to_s}>"
end

#kill_background_threadsObject



627
628
629
# File 'lib/resque/worker.rb', line 627

def kill_background_threads
  @heart.kill if @heart
end

#kill_childObject

Kills the forked child immediately, without remorse. The job it is processing will not be completed.



458
459
460
461
462
463
464
465
466
467
468
# File 'lib/resque/worker.rb', line 458

def kill_child
  if @child
    log_with_severity :debug, "Killing child at #{@child}"
    if `ps -o pid,state -p #{@child}`
      Process.kill("KILL", @child) rescue nil
    else
      log_with_severity :debug, "Child #{@child} not found, restarting."
      shutdown
    end
  end
end

#linux_worker_pidsObject

Find Resque worker pids on Linux and OS X.



799
800
801
802
803
# File 'lib/resque/worker.rb', line 799

def linux_worker_pids
  `ps -A -o pid,command | grep -E "[r]esque:work|[r]esque-[0-9]" | grep -v "resque-web"`.split("\n").map do |line|
    line.split(' ')[0]
  end
end

#log(message) ⇒ Object



827
828
829
# File 'lib/resque/worker.rb', line 827

def log(message)
  info(message)
end

#log!(message) ⇒ Object



831
832
833
# File 'lib/resque/worker.rb', line 831

def log!(message)
  debug(message)
end

#new_kill_childObject

Kills the forked child immediately with minimal remorse. The job it is processing will not be completed. Send the child a TERM signal, wait 5 seconds, and then a KILL signal if it has not quit



515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
# File 'lib/resque/worker.rb', line 515

def new_kill_child
  if @child
    unless Process.waitpid(@child, Process::WNOHANG)
      log_with_severity :debug, "Sending TERM signal to child #{@child}"
      Process.kill("TERM", @child)
      (term_timeout.to_f * 10).round.times do |i|
        sleep(0.1)
        return if Process.waitpid(@child, Process::WNOHANG)
      end
      log_with_severity :debug, "Sending KILL signal to child #{@child}"
      Process.kill("KILL", @child)
    else
      log_with_severity :debug, "Child #{@child} already quit."
    end
  end
rescue SystemCallError
  log_with_severity :error, "Child #{@child} already quit and reaped."
end

#pause_processingObject

Stop processing jobs after the current one has completed (if we’re currently running one).



541
542
543
544
545
# File 'lib/resque/worker.rb', line 541

def pause_processing
  log_with_severity :info, "USR2 received; pausing job processing"
  run_hook :before_pause, self
  @paused = true
end

#paused?Boolean

are we paused?

Returns:

  • (Boolean)


535
536
537
# File 'lib/resque/worker.rb', line 535

def paused?
  @paused
end

#perform(job) ⇒ Object

Processes a given job in the child.



287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/resque/worker.rb', line 287

def perform(job)
  begin
    run_hook :after_fork, job if will_fork?
    job.perform
  rescue Object => e
    report_failed_job(job,e)
  else
    log_with_severity :info, "done: #{job.inspect}"
  ensure
    yield job if block_given?
  end
end

#process(job = nil, &block) ⇒ Object

DEPRECATED. Processes a single job. If none is given, it will try to produce one. Usually run in the child.



261
262
263
264
265
266
267
268
269
# File 'lib/resque/worker.rb', line 261

def process(job = nil, &block)
  return unless job ||= reserve

  job.worker = self
  working_on job
  perform(job, &block)
ensure
  done_working
end

#processedObject

How many jobs has this worker processed? Returns an int.



689
690
691
# File 'lib/resque/worker.rb', line 689

def processed
  Stat["processed:#{self}"]
end

#processed!Object

Tell Redis we’ve processed a job.



694
695
696
697
# File 'lib/resque/worker.rb', line 694

def processed!
  Stat << "processed"
  Stat << "processed:#{self}"
end

#procline(string) ⇒ Object

Given a string, sets the procline ($0) and logs. Procline is always in the format of:

RESQUE_PROCLINE_PREFIXresque-VERSION: STRING


822
823
824
825
# File 'lib/resque/worker.rb', line 822

def procline(string)
  $0 = "#{ENV['RESQUE_PROCLINE_PREFIX']}resque-#{Resque::Version}: #{string}"
  log_with_severity :debug, $0
end

#prune_dead_workersObject

Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.

This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefore will leave stale state information in Redis.

By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.



564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
# File 'lib/resque/worker.rb', line 564

def prune_dead_workers
  all_workers = Worker.all

  unless all_workers.empty?
    known_workers = worker_pids
    all_workers_with_expired_heartbeats = Worker.all_workers_with_expired_heartbeats
  end

  all_workers.each do |worker|
    # If the worker hasn't sent a heartbeat, remove it from the registry.
    #
    # If the worker hasn't ever sent a heartbeat, we won't remove it since
    # the first heartbeat is sent before the worker is registred it means
    # that this is a worker that doesn't support heartbeats, e.g., another
    # client library or an older version of Resque. We won't touch these.
    if all_workers_with_expired_heartbeats.include?(worker)
      log_with_severity :info, "Pruning dead worker: #{worker}"
      worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s))
      next
    end

    host, pid, worker_queues_raw = worker.id.split(':')
    worker_queues = worker_queues_raw.split(",")
    unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set)
      # If the worker we are trying to prune does not belong to the queues
      # we are listening to, we should not touch it.
      # Attempt to prune a worker from different queues may easily result in
      # an unknown class exception, since that worker could easily be even
      # written in different language.
      next
    end

    next unless host == hostname
    next if known_workers.include?(pid)

    log_with_severity :debug, "Pruning dead worker: #{worker}"
    worker.unregister_worker
  end
end

#queuesObject

Returns a list of queues to use when searching for a job. A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.



174
175
176
177
# File 'lib/resque/worker.rb', line 174

def queues
  return @static_queues if @static_queues
  @queues.map { |queue| glob_match(queue) }.flatten.uniq
end

#queues=(queues) ⇒ Object



152
153
154
155
156
157
158
159
# File 'lib/resque/worker.rb', line 152

def queues=(queues)
  queues = queues.empty? ? (ENV["QUEUES"] || ENV['QUEUE']).to_s.split(',') : queues
  @queues = queues.map { |queue| queue.to_s.strip }
  unless ['*', '?', '{', '}', '[', ']'].any? {|char| @queues.join.include?(char) }
    @static_queues = @queues.flatten.uniq
  end
  validate_queues
end

#reconnectObject

Reconnect to Redis to avoid sharing a connection with the parent, retry up to 3 times with increasing delay before giving up.



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/resque/worker.rb', line 320

def reconnect
  tries = 0
  begin
    redis.client.reconnect
  rescue Redis::BaseConnectionError
    if (tries += 1) <= 3
      log_with_severity :error, "Error reconnecting to Redis; retrying"
      sleep(tries)
      retry
    else
      log_with_severity :error, "Error reconnecting to Redis; quitting"
      raise
    end
  end
end

#redisObject



19
20
21
# File 'lib/resque/worker.rb', line 19

def redis
  Resque.redis
end

#register_signal_handlersObject

Registers the various signal handlers a worker responds to.

TERM: Shutdown immediately, stop processing jobs.

INT: Shutdown immediately, stop processing jobs.

QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked child immediately, continue processing jobs. USR2: Don’t process any new jobs CONT: Start processing jobs again after a USR2



388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# File 'lib/resque/worker.rb', line 388

def register_signal_handlers
  trap('TERM') { graceful_term ? shutdown : shutdown!  }
  trap('INT')  { shutdown!  }

  begin
    trap('QUIT') { shutdown   }
    if term_child
      trap('USR1') { new_kill_child }
    else
      trap('USR1') { kill_child }
    end
    trap('USR2') { pause_processing }
    trap('CONT') { unpause_processing }
  rescue ArgumentError
    log_with_severity :warn, "Signals QUIT, USR1, USR2, and/or CONT not supported."
  end

  log_with_severity :debug, "Registered signals"
end

#register_workerObject

Registers ourself as a worker. Useful when entering the worker lifecycle on startup.



606
607
608
609
610
611
# File 'lib/resque/worker.rb', line 606

def register_worker
  redis.pipelined do
    redis.sadd(:workers, self)
    started!
  end
end

#report_failed_job(job, exception) ⇒ Object

Reports the exception and marks the job as failed



272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/resque/worker.rb', line 272

def report_failed_job(job,exception)
  log_with_severity :error, "#{job.inspect} failed: #{exception.inspect}"
  begin
    job.fail(exception)
  rescue Object => exception
    log_with_severity :error, "Received exception when reporting failure: #{exception.inspect}"
  end
  begin
    failed!
  rescue Object => exception
    log_with_severity :error, "Received exception when increasing failed jobs counter (redis issue) : #{exception.inspect}"
  end
end

#reserveObject

Attempts to grab a job off one of the provided queues. Returns nil if no job can be found.



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/resque/worker.rb', line 302

def reserve
  queues.each do |queue|
    log_with_severity :debug, "Checking #{queue}"
    if job = Resque.reserve(queue)
      log_with_severity :debug, "Found job on #{queue}"
      return job
    end
  end

  nil
rescue Exception => e
  log_with_severity :error, "Error reserving job: #{e.inspect}"
  log_with_severity :error, e.backtrace.join("\n")
  raise e
end

#run_hook(name, *args) ⇒ Object

Runs a named hook, passing along any arguments.



614
615
616
617
618
619
620
621
622
623
624
625
# File 'lib/resque/worker.rb', line 614

def run_hook(name, *args)
  return unless hooks = Resque.send(name)
  return if name == :before_first_fork && @before_first_fork_hook_ran
  msg = "Running #{name} hooks"
  msg << " with #{args.inspect}" if args.any?
  log_with_severity :info, msg

  hooks.each do |hook|
    args.any? ? hook.call(*args) : hook.call
    @before_first_fork_hook_ran = true if name == :before_first_fork
  end
end

#shutdownObject

Schedule this worker for shutdown. Will finish processing the current job.



427
428
429
430
# File 'lib/resque/worker.rb', line 427

def shutdown
  log_with_severity :info, 'Exiting...'
  @shutdown = true
end

#shutdown!Object

Kill the child and shutdown immediately. If not forking, abort this process.



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# File 'lib/resque/worker.rb', line 434

def shutdown!
  shutdown
  if term_child
    if fork_per_job?
      new_kill_child
    else
      # Raise TermException in the same process
      trap('TERM') do
        # ignore subsequent terms
      end
      raise TermException.new("SIGTERM")
    end
  else
    kill_child
  end
end

#shutdown?Boolean

Should this worker shutdown as soon as current job is finished?

Returns:

  • (Boolean)


452
453
454
# File 'lib/resque/worker.rb', line 452

def shutdown?
  @shutdown
end

#solaris_worker_pidsObject

Find Resque worker pids on Solaris.

Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.



809
810
811
812
813
814
815
816
817
# File 'lib/resque/worker.rb', line 809

def solaris_worker_pids
  `ps -A -o pid,comm | grep "[r]uby" | grep -v "resque-web"`.split("\n").map do |line|
    real_pid = line.split(' ')[0]
    pargs_command = `pargs -a #{real_pid} 2>/dev/null | grep [r]esque | grep -v "resque-web"`
    if pargs_command.split(':')[1] == " resque-#{Resque::Version}"
      real_pid
    end
  end.compact
end

#start_heartbeatObject



502
503
504
505
506
507
508
509
510
# File 'lib/resque/worker.rb', line 502

def start_heartbeat
  heartbeat!
  @heart = Thread.new do
    loop do
      sleep(Resque.heartbeat_interval)
      heartbeat!
    end
  end
end

#startedObject

What time did this worker start? Returns an instance of ‘Time`



711
712
713
# File 'lib/resque/worker.rb', line 711

def started
  redis.get "worker:#{self}:started"
end

#started!Object

Tell Redis we’ve started



716
717
718
# File 'lib/resque/worker.rb', line 716

def started!
  redis.set("worker:#{self}:started", Time.now.to_s)
end

#startupObject

Runs all the methods needed when a worker begins its lifecycle.



359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/resque/worker.rb', line 359

def startup
  enable_gc_optimizations
  register_signal_handlers
  start_heartbeat
  prune_dead_workers
  run_hook :before_first_fork
  register_worker

  # Fix buffering so we can `rake resque:work > resque.log` and
  # get output from the child in there.
  $stdout.sync = true
end

#stateObject

Returns a symbol representing the current worker state, which can be either :working or :idle



748
749
750
# File 'lib/resque/worker.rb', line 748

def state
  redis.exists("worker:#{self}") ? :working : :idle
end

#unpause_processingObject

Start processing jobs again after a pause



548
549
550
551
552
# File 'lib/resque/worker.rb', line 548

def unpause_processing
  log_with_severity :info, "CONT received; resuming job processing"
  @paused = false
  run_hook :after_pause, self
end

#unregister_signal_handlersObject



408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
# File 'lib/resque/worker.rb', line 408

def unregister_signal_handlers
  trap('TERM') do
    trap ('TERM') do
      # ignore subsequent terms
    end
    raise TermException.new("SIGTERM")
  end
  trap('INT', 'DEFAULT')

  begin
    trap('QUIT', 'DEFAULT')
    trap('USR1', 'DEFAULT')
    trap('USR2', 'DEFAULT')
  rescue ArgumentError
  end
end

#unregister_worker(exception = nil) ⇒ Object

Unregisters ourself as a worker. Useful when shutting down.



632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
# File 'lib/resque/worker.rb', line 632

def unregister_worker(exception = nil)
  # If we're still processing a job, make sure it gets logged as a
  # failure.
  if (hash = processing) && !hash.empty?
    job = Job.new(hash['queue'], hash['payload'])
    # Ensure the proper worker is attached to this job, even if
    # it's not the precise instance that died.
    job.worker = self
    begin
      job.fail(exception || DirtyExit.new("Job still being processed"))
    rescue RuntimeError => e
      log_with_severity :error, e.message
    end
  end

  kill_background_threads

  redis.pipelined do
    redis.srem(:workers, self)
    redis.del("worker:#{self}")
    redis.del("worker:#{self}:started")
    redis.hdel(WORKER_HEARTBEAT_KEY, self.to_s)

    Stat.clear("processed:#{self}")
    Stat.clear("failed:#{self}")
  end
rescue Exception => exception_while_unregistering
  message = exception_while_unregistering.message
  if exception
    message = message + "\nOriginal Exception (#{exception.class}): #{exception.message}\n" +
                        "  #{exception.backtrace.join("  \n")}"
  end
  fail(exception_while_unregistering.class,
       message,
       exception_while_unregistering.backtrace)
end

#validate_queuesObject

A worker must be given a queue, otherwise it won’t know what to do with itself.

You probably never need to call this.



165
166
167
168
169
# File 'lib/resque/worker.rb', line 165

def validate_queues
  if @queues.nil? || @queues.empty?
    raise NoQueueError.new("Please give each worker at least one queue.")
  end
end

#verboseObject



836
837
838
# File 'lib/resque/worker.rb', line 836

def verbose
  @verbose
end

#verbose=(value) ⇒ Object



844
845
846
847
848
849
850
851
852
853
# File 'lib/resque/worker.rb', line 844

def verbose=(value);
  if value && !very_verbose
    Resque.logger.formatter = VerboseFormatter.new
    Resque.logger.level = Logger::INFO
  elsif !value
    Resque.logger.formatter = QuietFormatter.new
  end

  @verbose = value
end

#very_verboseObject



840
841
842
# File 'lib/resque/worker.rb', line 840

def very_verbose
  @very_verbose
end

#very_verbose=(value) ⇒ Object



855
856
857
858
859
860
861
862
863
864
865
866
867
# File 'lib/resque/worker.rb', line 855

def very_verbose=(value)
  if value
    Resque.logger.formatter = VeryVerboseFormatter.new
    Resque.logger.level = Logger::DEBUG
  elsif !value && verbose
    Resque.logger.formatter = VerboseFormatter.new
    Resque.logger.level = Logger::INFO
  else
    Resque.logger.formatter = QuietFormatter.new
  end

  @very_verbose = value
end

#will_fork?Boolean

Returns:

  • (Boolean)


738
739
740
# File 'lib/resque/worker.rb', line 738

def will_fork?
  !@cant_fork && fork_per_job?
end

#windows_worker_pidsObject

Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.



792
793
794
795
# File 'lib/resque/worker.rb', line 792

def windows_worker_pids
  tasklist_output = `tasklist /FI "IMAGENAME eq ruby.exe" /FO list`.encode("UTF-8", Encoding.locale_charmap)
  tasklist_output.split($/).select { |line| line =~ /^PID:/}.collect{ |line| line.gsub /PID:\s+/, '' }
end

#work(interval = 5.0, &block) ⇒ Object

This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.

The following events occur during a worker’s life cycle:

  1. Startup: Signals are registered, dead workers are pruned,

    and this worker is registered.
    
  2. Work loop: Jobs are pulled from a queue and processed.

  3. Teardown: This worker is unregistered.

Can be passed a float representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.

Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing.



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/resque/worker.rb', line 201

def work(interval = 5.0, &block)
  interval = Float(interval)
  $0 = "resque: Starting"
  startup

  loop do
    break if shutdown?

    if not paused? and job = reserve
      log_with_severity :info, "got: #{job.inspect}"
      job.worker = self
      working_on job

      procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class_name}]"
      if @child = fork(job)
        srand # Reseeding
        procline "Forked #{@child} at #{Time.now.to_i}"
        begin
          Process.waitpid(@child)
        rescue SystemCallError
          nil
        end
        job.fail(DirtyExit.new("Child process received unhandled signal #{$?.stopsig}")) if $?.signaled?
      else
        unregister_signal_handlers if will_fork? && term_child
        begin

          reconnect if will_fork?
          perform(job, &block)

        rescue Exception => exception
          report_failed_job(job,exception)
        end

        if will_fork?
          run_at_exit_hooks ? exit : exit!
        end
      end

      done_working
      @child = nil
    else
      break if interval.zero?
      log_with_severity :debug, "Sleeping for #{interval} seconds"
      procline paused? ? "Paused" : "Waiting for #{queues.join(',')}"
      sleep interval
    end
  end

  unregister_worker
rescue Exception => exception
  unless exception.class == SystemExit && !@child && run_at_exit_hooks
    log_with_severity :error, "Failed to start worker : #{exception.inspect}"

    unregister_worker(exception)
  end
end

#worker_pidsObject

Returns an Array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.



780
781
782
783
784
785
786
787
788
# File 'lib/resque/worker.rb', line 780

def worker_pids
  if RUBY_PLATFORM =~ /solaris/
    solaris_worker_pids
  elsif RUBY_PLATFORM =~ /mingw32/
    windows_worker_pids
  else
    linux_worker_pids
  end
end

#working?Boolean

Boolean - true if working, false if not

Returns:

  • (Boolean)


729
730
731
# File 'lib/resque/worker.rb', line 729

def working?
  state == :working
end

#working_on(job) ⇒ Object

Given a job, tells Redis we’re working on it. Useful for seeing what workers are doing and when.



671
672
673
674
675
676
677
# File 'lib/resque/worker.rb', line 671

def working_on(job)
  data = encode \
    :queue   => job.queue,
    :run_at  => Time.now.utc.iso8601,
    :payload => job.payload
  redis.set("worker:#{self}", data)
end