Class: Resque::Worker

Inherits:
Object
  • Object
show all
Extended by:
Helpers
Includes:
Helpers
Defined in:
lib/resque/worker.rb

Overview

A Resque Worker processes jobs. On platforms that support fork(2), the worker will fork off a child to process each job. This ensures a clean slate when beginning the next job and cuts down on gradual memory growth as well as low level failures.

It also ensures workers are always listening to signals from you, their master, and can react accordingly.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers

classify, constantize, decode, encode, redis

Constructor Details

#initialize(*queues) ⇒ Worker

Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.

If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.



75
76
77
78
# File 'lib/resque/worker.rb', line 75

def initialize(*queues)
  @queues = queues
  validate_queues
end

Instance Attribute Details

#cant_forkObject

Boolean indicating whether this worker can or can not fork. Automatically set if a fork(2) fails.



21
22
23
# File 'lib/resque/worker.rb', line 21

def cant_fork
  @cant_fork
end

#to_sObject Also known as: id

The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.



379
380
381
# File 'lib/resque/worker.rb', line 379

def to_s
  @to_s ||= "#{hostname}:#{Process.pid}:#{@queues.join(',')}"
end

#verboseObject

Whether the worker should log basic info to STDOUT



14
15
16
# File 'lib/resque/worker.rb', line 14

def verbose
  @verbose
end

#very_verboseObject

Whether the worker should log lots of info to STDOUT



17
18
19
# File 'lib/resque/worker.rb', line 17

def very_verbose
  @very_verbose
end

Class Method Details

.allObject

Returns an array of all worker objects.



26
27
28
# File 'lib/resque/worker.rb', line 26

def self.all
  redis.smembers(:workers).map { |id| find(id) }
end

.attach(worker_id) ⇒ Object

Alias of ‘find`



54
55
56
# File 'lib/resque/worker.rb', line 54

def self.attach(worker_id)
  find(worker_id)
end

.exists?(worker_id) ⇒ Boolean

Given a string worker id, return a boolean indicating whether the worker exists

Returns:

  • (Boolean)


60
61
62
# File 'lib/resque/worker.rb', line 60

def self.exists?(worker_id)
  redis.sismember(:workers, worker_id)
end

.find(worker_id) ⇒ Object

Returns a single worker object. Accepts a string id.



42
43
44
45
46
47
48
49
50
51
# File 'lib/resque/worker.rb', line 42

def self.find(worker_id)
  if exists? worker_id
    queues = worker_id.split(':')[-1].split(',')
    worker = new(*queues)
    worker.to_s = worker_id
    worker
  else
    nil
  end
end

.workingObject

Returns an array of all worker objects currently processing jobs.



32
33
34
35
36
37
38
39
# File 'lib/resque/worker.rb', line 32

def self.working
  names = all
  return [] unless names.any?
  names.map! { |name| "worker:#{name}" }
  redis.mapped_mget(*names).keys.map do |key|
    find key.sub("worker:", '')
  end
end

Instance Method Details

#==(other) ⇒ Object

Is this worker the same as another worker?



369
370
371
# File 'lib/resque/worker.rb', line 369

def ==(other)
  to_s == other.to_s
end

#done_workingObject

Called when we are done working - clears our ‘working_on` state and tells Redis we processed a job.



309
310
311
312
# File 'lib/resque/worker.rb', line 309

def done_working
  processed!
  redis.del("worker:#{self}")
end

#enable_gc_optimizationsObject

Enables GC Optimizations if you’re running REE. www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow



209
210
211
212
213
# File 'lib/resque/worker.rb', line 209

def enable_gc_optimizations
  if GC.respond_to?(:copy_on_write_friendly=)
    GC.copy_on_write_friendly = true
  end
end

#failedObject

How many failed jobs has this worker seen? Returns an int.



326
327
328
# File 'lib/resque/worker.rb', line 326

def failed
  Stat["failed:#{self}"]
end

#failed!Object

Tells Redis we’ve failed a job.



331
332
333
334
# File 'lib/resque/worker.rb', line 331

def failed!
  Stat << "failed"
  Stat << "failed:#{self}"
end

#forkObject

Not every platform supports fork. Here we do our magic to determine if yours does.



186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/resque/worker.rb', line 186

def fork
  @cant_fork = true if $TESTING

  return if @cant_fork

  begin
    Kernel.fork
  rescue NotImplementedError
    @cant_fork = true
    nil
  end
end

#hostnameObject

chomp’d hostname of this machine



385
386
387
# File 'lib/resque/worker.rb', line 385

def hostname
  @hostname ||= `hostname`.chomp
end

#idle?Boolean

Boolean - true if idle, false if not

Returns:

  • (Boolean)


358
359
360
# File 'lib/resque/worker.rb', line 358

def idle?
  state == :idle
end

#inspectObject



373
374
375
# File 'lib/resque/worker.rb', line 373

def inspect
  "#<Worker #{to_s}>"
end

#jobObject Also known as: processing

Returns a hash explaining the Job we’re currently processing, if any.



347
348
349
# File 'lib/resque/worker.rb', line 347

def job
  decode(redis.get("worker:#{self}")) || {}
end

#kill_childObject

Kills the forked child immediately, without remorse. The job it is processing will not be completed.



246
247
248
249
250
251
252
253
254
255
256
# File 'lib/resque/worker.rb', line 246

def kill_child
  if @child
    log! "Killing child at #{@child}"
    if system("ps -o pid,state -p #{@child}")
      Process.kill("KILL", @child) rescue nil
    else
      log! "Child #{@child} not found, restarting."
      shutdown
    end
  end
end

#log(message) ⇒ Object

Log a message to STDOUT if we are verbose or very_verbose.



398
399
400
401
402
403
404
405
# File 'lib/resque/worker.rb', line 398

def log(message)
  if verbose
    puts "*** #{message}"
  elsif very_verbose
    time = Time.now.strftime('%I:%M:%S %Y-%m-%d')
    puts "** [#{time}] #$$: #{message}"
  end
end

#log!(message) ⇒ Object

Logs a very verbose message to STDOUT.



408
409
410
# File 'lib/resque/worker.rb', line 408

def log!(message)
  log message if very_verbose
end

#process(job = nil) ⇒ Object

Processes a single job. If none is given, it will try to produce one.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/resque/worker.rb', line 145

def process(job = nil)
  return unless job ||= reserve

  begin
    working_on job
    job.perform
  rescue Object => e
    log "#{job.inspect} failed: #{e.inspect}"
    job.fail(e)
    failed!
  else
    log "done: #{job.inspect}"
  ensure
    yield job if block_given?
    done_working
  end
end

#processedObject

How many jobs has this worker processed? Returns an int.



315
316
317
# File 'lib/resque/worker.rb', line 315

def processed
  Stat["processed:#{self}"]
end

#processed!Object

Tell Redis we’ve processed a job.



320
321
322
323
# File 'lib/resque/worker.rb', line 320

def processed!
  Stat << "processed"
  Stat << "processed:#{self}"
end

#prune_dead_workersObject

Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.

This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefor will leave stale state information in Redis.

By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.



268
269
270
271
272
273
274
275
276
# File 'lib/resque/worker.rb', line 268

def prune_dead_workers
  Worker.all.each do |worker|
    host, pid, queues = worker.id.split(':')
    next unless host == hostname
    next if worker_pids.include?(pid)
    log! "Pruning dead worker: #{worker}"
    worker.unregister_worker
  end
end

#queuesObject

Returns a list of queues to use when searching for a job. A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.



180
181
182
# File 'lib/resque/worker.rb', line 180

def queues
  @queues[0] == "*" ? Resque.queues.sort : @queues
end

#register_signal_handlersObject

Registers the various signal handlers a worker responds to.

TERM: Shutdown immediately, stop processing jobs.

INT: Shutdown immediately, stop processing jobs.

QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked child immediately, continue processing jobs.



221
222
223
224
225
226
227
228
229
# File 'lib/resque/worker.rb', line 221

def register_signal_handlers
  trap('TERM') { shutdown!  }
  trap('INT')  { shutdown!  }
  unless defined? JRUBY_VERSION
    trap('QUIT') { shutdown   }
    trap('USR1') { kill_child }
  end
  log! "Registered signals"
end

#register_workerObject

Registers ourself as a worker. Useful when entering the worker lifecycle on startup.



280
281
282
283
# File 'lib/resque/worker.rb', line 280

def register_worker
  redis.sadd(:workers, self)
  started!
end

#reserveObject

Attempts to grab a job off one of the provided queues. Returns nil if no job can be found.



165
166
167
168
169
170
171
172
173
174
175
# File 'lib/resque/worker.rb', line 165

def reserve
  queues.each do |queue|
    log! "Checking #{queue}"
    if job = Resque::Job.reserve(queue)
      log! "Found job on #{queue}"
      return job
    end
  end

  nil
end

#shutdownObject

Schedule this worker for shutdown. Will finish processing the current job.



233
234
235
236
# File 'lib/resque/worker.rb', line 233

def shutdown
  log 'Exiting...'
  @shutdown = true
end

#shutdown!Object

Kill the child and shutdown immediately.



239
240
241
242
# File 'lib/resque/worker.rb', line 239

def shutdown!
  shutdown
  kill_child
end

#startedObject

What time did this worker start? Returns an instance of ‘Time`



337
338
339
# File 'lib/resque/worker.rb', line 337

def started
  redis.get "worker:#{self}:started"
end

#started!Object

Tell Redis we’ve started



342
343
344
# File 'lib/resque/worker.rb', line 342

def started!
  redis.set("worker:#{self}:started", Time.now.to_s)
end

#startupObject

Runs all the methods needed when a worker begins its lifecycle.



200
201
202
203
204
205
# File 'lib/resque/worker.rb', line 200

def startup
  enable_gc_optimizations
  register_signal_handlers
  prune_dead_workers
  register_worker
end

#stateObject

Returns a symbol representing the current worker state, which can be either :working or :idle



364
365
366
# File 'lib/resque/worker.rb', line 364

def state
  redis.exists("worker:#{self}") ? :working : :idle
end

#unregister_workerObject

Unregisters ourself as a worker. Useful when shutting down.



286
287
288
289
290
291
292
293
294
# File 'lib/resque/worker.rb', line 286

def unregister_worker
  done_working

  redis.srem(:workers, self)
  redis.del("worker:#{self}:started")

  Stat.clear("processed:#{self}")
  Stat.clear("failed:#{self}")
end

#validate_queuesObject

A worker must be given a queue, otherwise it won’t know what to do with itself.

You probably never need to call this.



84
85
86
87
88
# File 'lib/resque/worker.rb', line 84

def validate_queues
  if @queues.nil? || @queues.empty?
    raise NoQueueError.new("Please give each worker at least one queue.")
  end
end

#work(interval = 5, &block) ⇒ Object

This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.

The following events occur during a worker’s life cycle:

  1. startup: Signals are registered, dead workers are pruned,

    and this worker is registered.
    
  2. work loop: Jobs are pulled from a queue and processed

  3. teardown: This worker is unregistered.

Can be passed an integered representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.

Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing.



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/resque/worker.rb', line 106

def work(interval = 5, &block)
  $0 = "resque: Starting"
  startup

  loop do
    break if @shutdown

    if job = reserve
      log "got: #{job.inspect}"

      if @child = fork
        rand # Reseeding
        procline = "resque: Forked #{@child} at #{Time.now.to_i}"
        $0 = procline
        log! procline
        Process.wait
      else
        procline = "resque: Processing #{job.queue} since #{Time.now.to_i}"
        $0 = procline
        log! procline
        process(job, &block)
        exit! unless @cant_fork
      end

      @child = nil
    else
      break if interval.to_i == 0
      log! "Sleeping for #{interval.to_i}"
      $0 = "resque: Waiting for #{@queues.join(',')}"
      sleep interval.to_i
    end
  end

ensure
  unregister_worker
end

#worker_pidsObject

Returns an array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.



391
392
393
394
395
# File 'lib/resque/worker.rb', line 391

def worker_pids
  `ps -A -o pid,command | grep [r]esque`.split("\n").map do |line|
    line.split(' ')[0]
  end
end

#working?Boolean

Boolean - true if working, false if not

Returns:

  • (Boolean)


353
354
355
# File 'lib/resque/worker.rb', line 353

def working?
  state == :working
end

#working_on(job) ⇒ Object

Given a job, tells Redis we’re working on it. Useful for seeing what workers are doing and when.



298
299
300
301
302
303
304
305
# File 'lib/resque/worker.rb', line 298

def working_on(job)
  job.worker = self
  data = encode \
    :queue   => job.queue,
    :run_at  => Time.now.to_s,
    :payload => job.payload
  redis.set("worker:#{self}", data)
end