Class: Resque::Worker

Inherits:
Object
  • Object
show all
Includes:
SemanticLogger::Loggable
Defined in:
lib/resque_manager/overrides/resque/worker.rb

Constant Summary collapse

@@local_ip =
nil

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.start(options) ⇒ Object



224
225
226
227
228
229
230
231
232
233
234
# File 'lib/resque_manager/overrides/resque/worker.rb', line 224

def self.start(options)
  ips = options[:hosts]
  application_path = options[:application_path]

  queues = options[:queues]
  if Rails.env =~ /development|test/
    Thread.new(application_path, queues) { |application_path, queue| system("cd #{application_path || '.'} && bundle exec #{ResqueManager.resque_worker_rake || 'rake'} RAILS_ENV=#{Rails.env} QUEUE=#{queue} resque:work") }
  else
    Thread.new(ips, application_path, queues) { |ip_list, application_path, queue| system("cd #{Rails.root} && #{ResqueManager.resque_worker_cap || 'bundle exec cap'} #{Rails.env} resque:work host=#{ip_list} application_path=#{application_path} queue=#{queue}") }
  end
end

.workingObject

logic for mappged_mget changed where it returns keys with nil values in latest redis gem.



206
207
208
209
210
211
212
213
# File 'lib/resque_manager/overrides/resque/worker.rb', line 206

def self.working
  names = all
  return [] unless names.any?
  names.map! { |name| "worker:#{name}" }
  Resque.redis.mapped_mget(*names).map do |key, value|
    find key.sub("worker:", '') unless value.nil?
  end.compact
end

Instance Method Details

#all_workers_in_pid_workingObject



148
149
150
# File 'lib/resque_manager/overrides/resque/worker.rb', line 148

def all_workers_in_pid_working
  workers_in_pid.select { |w| (hash = w.processing) && !hash.empty? }
end

#continueObject



259
260
261
262
263
264
265
# File 'lib/resque_manager/overrides/resque/worker.rb', line 259

def continue
  if Rails.env =~ /development|test/
    system("kill -CONT  #{self.pid}")
  else
    system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:continue_worker pid=#{self.pid} host=#{self.ip}")
  end
end

#ipObject



48
49
50
# File 'lib/resque_manager/overrides/resque/worker.rb', line 48

def ip
  to_s.split(':').first[/\b(?:\d{1,3}\.){3}\d{1,3}\b/]
end

#local_ipObject



10
11
12
# File 'lib/resque_manager/overrides/resque/worker.rb', line 10

def local_ip
  @@local_ip ||= UDPSocket.open { |s| s.connect('google.com', 1); s.addr.last }
end

#overview_messageObject



220
221
222
# File 'lib/resque_manager/overrides/resque/worker.rb', line 220

def overview_message
  job['overview_message']
end

#overview_message=(message) ⇒ Object



215
216
217
218
# File 'lib/resque_manager/overrides/resque/worker.rb', line 215

def overview_message=(message)
  data = encode(job.merge('overview_message' => message))
  Resque.redis.set("worker:#{self}", data)
end

#pathObject



36
37
38
# File 'lib/resque_manager/overrides/resque/worker.rb', line 36

def path
  to_s.split(':').fourth
end

#pauseObject



251
252
253
254
255
256
257
# File 'lib/resque_manager/overrides/resque/worker.rb', line 251

def pause
  if Rails.env =~ /development|test/
    system("kill -USR2  #{self.pid}")
  else
    system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:pause_worker pid=#{self.pid} host=#{self.ip}")
  end
end

#pause_keyObject

When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion



23
24
25
26
# File 'lib/resque_manager/overrides/resque/worker.rb', line 23

def pause_key
  key = to_s.split(':')
  "worker:#{key.first}:#{key.second}:all_workers:paused"
end

#pause_processing_with_pause_keyObject

Stop processing jobs after the current one has completed (if we’re currently running one). OVERRIDE to set a redis key so UI knows it’s paused too



100
101
102
103
# File 'lib/resque_manager/overrides/resque/worker.rb', line 100

def pause_processing_with_pause_key
  pause_processing_without_pause_key
  Resque.redis.set(pause_key, Time.now.to_s)
end

#pausedObject



87
88
89
# File 'lib/resque_manager/overrides/resque/worker.rb', line 87

def paused
  Resque.redis.get pause_key
end

#paused?Boolean

are we paused? OVERRIDE so UI can tell if we’re paused

Returns:

  • (Boolean)


93
94
95
# File 'lib/resque_manager/overrides/resque/worker.rb', line 93

def paused?
  @paused || paused.present?
end

#pidObject



28
29
30
# File 'lib/resque_manager/overrides/resque/worker.rb', line 28

def pid
  to_s.split(':').second
end

#prune_dead_workersObject

Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.

This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefore will leave stale state information in Redis.

By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.



127
128
129
130
131
132
133
134
135
# File 'lib/resque_manager/overrides/resque/worker.rb', line 127

def prune_dead_workers
  Worker.all.each do |worker|
    host, pid, thread, path, queues = worker.id.split(':')
    next unless host.include?(hostname)
    next if worker_pids.include?(pid)
    log! "Pruning dead worker: #{worker}"
    worker.unregister_worker
  end
end

#queueObject



40
41
42
# File 'lib/resque_manager/overrides/resque/worker.rb', line 40

def queue
  to_s.split(':').fifth
end

#queuesObject

OVERRIDE for multithreaded workers



57
58
59
# File 'lib/resque_manager/overrides/resque/worker.rb', line 57

def queues
  Thread.current[:queues] == "*" ? Resque.queues.sort : Thread.current[:queues].split(',')
end

#queues_in_pidObject



52
53
54
# File 'lib/resque_manager/overrides/resque/worker.rb', line 52

def queues_in_pid
  workers_in_pid.collect { |w| w.queue }.compact
end

#quitObject



236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/resque_manager/overrides/resque/worker.rb', line 236

def quit
  if Rails.env =~ /development|test/
    if RUBY_PLATFORM =~ /java/
      #jruby doesn't trap the -QUIT signal
      #-TERM gracefully kills the main pid and does a -9 on the child if there is one.
      #Since jruby doesn't fork a child, the main worker is gracefully killed.
      system("kill -TERM  #{self.pid}")
    else
      system("kill -QUIT  #{self.pid}")
    end
  else
    system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:quit_worker pid=#{self.pid} host=#{self.ip} application_path=#{self.path}")
  end
end

#reconnectObject

override so we can synchronize the client on the reconnect for multithreaded workers.



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/resque_manager/overrides/resque/worker.rb', line 187

def reconnect
  tries = 0
  begin
    redis.synchronize do |client|
      client.reconnect
    end
  rescue Redis::BaseConnectionError
    if (tries += 1) <= 3
      log "Error reconnecting to Redis; retrying"
      sleep(tries)
      retry
    else
      log "Error reconnecting to Redis; quitting"
      raise
    end
  end
end

#restartObject



267
268
269
270
271
# File 'lib/resque_manager/overrides/resque/worker.rb', line 267

def restart
  queues = self.queues_in_pid.join('#')
  quit
  self.class.start(hosts: self.ip, queues: queues, application_path: self.path)
end

#shutdown_with_multithreadingObject

Schedule this worker for shutdown. Will finish processing the current job. OVERRIDE for multithreaded workers



80
81
82
83
# File 'lib/resque_manager/overrides/resque/worker.rb', line 80

def shutdown_with_multithreading
  Thread.list.each { |t| t[:shutdown] = true }
  shutdown_without_multithreading
end

#shutdown_with_multithreading?Boolean

Returns:

  • (Boolean)


180
181
182
# File 'lib/resque_manager/overrides/resque/worker.rb', line 180

def shutdown_with_multithreading?
  shutdown_without_multithreading? || Thread.current[:shutdown]
end

#startupObject

Runs all the methods needed when a worker begins its lifecycle. OVERRIDE for multithreaded workers



63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/resque_manager/overrides/resque/worker.rb', line 63

def startup
  enable_gc_optimizations
  if Thread.current == Thread.main
    register_signal_handlers
    prune_dead_workers
  end
  run_hook :before_first_fork
  register_worker

  # Fix buffering so we can `rake resque:work > resque.log` and
  # get output from the child in there.
  $stdout.sync = true
end

#threadObject



32
33
34
# File 'lib/resque_manager/overrides/resque/worker.rb', line 32

def thread
  to_s.split(':').third
end

#to_sObject Also known as: id

The string representation is the same as the id for this worker instance. Can be used with Worker.find.



16
17
18
# File 'lib/resque_manager/overrides/resque/worker.rb', line 16

def to_s
  @to_s || "#{hostname}(#{local_ip}):#{Process.pid}:#{Thread.current.object_id}:#{Thread.current[:path]}:#{Thread.current[:queues]}"
end

#unpause_processing_with_pause_keyObject

Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error



110
111
112
113
# File 'lib/resque_manager/overrides/resque/worker.rb', line 110

def unpause_processing_with_pause_key
  unpause_processing_without_pause_key
  Resque.redis.del(pause_key)
end

#unregister_worker_with_pause(exception = nil) ⇒ Object

Unregisters ourself as a worker. Useful when shutting down. OVERRIDE to also remove the pause key Would prefer to call super but get no superclass method error



140
141
142
143
144
# File 'lib/resque_manager/overrides/resque/worker.rb', line 140

def unregister_worker_with_pause(exception = nil)
  unregister_worker_without_pause(exception)

  Resque.redis.del(pause_key)
end

#work_with_multithreading(interval = 5.0, &block) ⇒ Object

This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.

The following events occur during a worker’s life cycle:

  1. Startup: Signals are registered, dead workers are pruned,

    and this worker is registered.
    
  2. Work loop: Jobs are pulled from a queue and processed.

  3. Teardown: This worker is unregistered.

Can be passed an integer representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.

Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing. OVERRIDE for multithreaded workers



169
170
171
172
173
174
175
176
# File 'lib/resque_manager/overrides/resque/worker.rb', line 169

def work_with_multithreading(interval = 5.0, &block)
  work_without_multithreading(interval, &block)
  loop do
    #hang onto the process until all threads are done
    break if all_workers_in_pid_working.blank?
    sleep interval.to_i
  end
end

#workers_in_pidObject



44
45
46
# File 'lib/resque_manager/overrides/resque/worker.rb', line 44

def workers_in_pid
  Array(Resque.redis.smembers(:workers)).select { |id| id =~ /\(#{ip}\):#{pid}/ }.map { |id| Resque::Worker.find(id) }.compact
end