Class: Resque::Worker
- Inherits:
-
Object
- Object
- Resque::Worker
- Includes:
- SemanticLogger::Loggable
- Defined in:
- lib/resque_manager/overrides/resque/worker.rb
Constant Summary collapse
- @@local_ip =
nil
Class Method Summary collapse
- .start(options) ⇒ Object
-
.working ⇒ Object
logic for mappged_mget changed where it returns keys with nil values in latest redis gem.
Instance Method Summary collapse
- #all_workers_in_pid_working ⇒ Object
- #continue ⇒ Object
- #ip ⇒ Object
- #local_ip ⇒ Object
- #overview_message ⇒ Object
- #overview_message=(message) ⇒ Object
- #path ⇒ Object
- #pause ⇒ Object
-
#pause_key ⇒ Object
When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion.
-
#pause_processing_with_pause_key ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one).
- #paused ⇒ Object
-
#paused? ⇒ Boolean
are we paused? OVERRIDE so UI can tell if we’re paused.
- #pid ⇒ Object
-
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
- #queue ⇒ Object
-
#queues ⇒ Object
OVERRIDE for multithreaded workers.
- #queues_in_pid ⇒ Object
- #quit ⇒ Object
-
#reconnect ⇒ Object
override so we can synchronize the client on the reconnect for multithreaded workers.
- #restart ⇒ Object
-
#shutdown_with_multithreading ⇒ Object
Schedule this worker for shutdown.
- #shutdown_with_multithreading? ⇒ Boolean
-
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle.
- #thread ⇒ Object
-
#to_s ⇒ Object
(also: #id)
The string representation is the same as the id for this worker instance.
-
#unpause_processing_with_pause_key ⇒ Object
Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error.
-
#unregister_worker_with_pause(exception = nil) ⇒ Object
Unregisters ourself as a worker.
-
#work_with_multithreading(interval = 5.0, &block) ⇒ Object
This is the main workhorse method.
- #workers_in_pid ⇒ Object
Class Method Details
.start(options) ⇒ Object
224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 224 def self.start() ips = [:hosts] application_path = [:application_path] queues = [:queues] if Rails.env =~ /development|test/ Thread.new(application_path, queues) { |application_path, queue| system("cd #{application_path || '.'} && bundle exec #{ResqueManager.resque_worker_rake || 'rake'} RAILS_ENV=#{Rails.env} QUEUE=#{queue} resque:work") } else Thread.new(ips, application_path, queues) { |ip_list, application_path, queue| system("cd #{Rails.root} && #{ResqueManager.resque_worker_cap || 'bundle exec cap'} #{Rails.env} resque:work host=#{ip_list} application_path=#{application_path} queue=#{queue}") } end end |
.working ⇒ Object
logic for mappged_mget changed where it returns keys with nil values in latest redis gem.
206 207 208 209 210 211 212 213 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 206 def self.working names = all return [] unless names.any? names.map! { |name| "worker:#{name}" } Resque.redis.mapped_mget(*names).map do |key, value| find key.sub("worker:", '') unless value.nil? end.compact end |
Instance Method Details
#all_workers_in_pid_working ⇒ Object
148 149 150 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 148 def all_workers_in_pid_working workers_in_pid.select { |w| (hash = w.processing) && !hash.empty? } end |
#continue ⇒ Object
259 260 261 262 263 264 265 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 259 def continue if Rails.env =~ /development|test/ system("kill -CONT #{self.pid}") else system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:continue_worker pid=#{self.pid} host=#{self.ip}") end end |
#ip ⇒ Object
48 49 50 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 48 def ip to_s.split(':').first[/\b(?:\d{1,3}\.){3}\d{1,3}\b/] end |
#local_ip ⇒ Object
10 11 12 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 10 def local_ip @@local_ip ||= UDPSocket.open { |s| s.connect('google.com', 1); s.addr.last } end |
#overview_message ⇒ Object
220 221 222 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 220 def job['overview_message'] end |
#overview_message=(message) ⇒ Object
215 216 217 218 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 215 def () data = encode(job.merge('overview_message' => )) Resque.redis.set("worker:#{self}", data) end |
#path ⇒ Object
36 37 38 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 36 def path to_s.split(':').fourth end |
#pause ⇒ Object
251 252 253 254 255 256 257 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 251 def pause if Rails.env =~ /development|test/ system("kill -USR2 #{self.pid}") else system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:pause_worker pid=#{self.pid} host=#{self.ip}") end end |
#pause_key ⇒ Object
When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion
23 24 25 26 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 23 def pause_key key = to_s.split(':') "worker:#{key.first}:#{key.second}:all_workers:paused" end |
#pause_processing_with_pause_key ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one). OVERRIDE to set a redis key so UI knows it’s paused too
100 101 102 103 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 100 def pause_processing_with_pause_key pause_processing_without_pause_key Resque.redis.set(pause_key, Time.now.to_s) end |
#paused ⇒ Object
87 88 89 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 87 def paused Resque.redis.get pause_key end |
#paused? ⇒ Boolean
are we paused? OVERRIDE so UI can tell if we’re paused
93 94 95 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 93 def paused? @paused || paused.present? end |
#pid ⇒ Object
28 29 30 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 28 def pid to_s.split(':').second end |
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefore will leave stale state information in Redis.
By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.
127 128 129 130 131 132 133 134 135 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 127 def prune_dead_workers Worker.all.each do |worker| host, pid, thread, path, queues = worker.id.split(':') next unless host.include?(hostname) next if worker_pids.include?(pid) log! "Pruning dead worker: #{worker}" worker.unregister_worker end end |
#queue ⇒ Object
40 41 42 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 40 def queue to_s.split(':').fifth end |
#queues ⇒ Object
OVERRIDE for multithreaded workers
57 58 59 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 57 def queues Thread.current[:queues] == "*" ? Resque.queues.sort : Thread.current[:queues].split(',') end |
#queues_in_pid ⇒ Object
52 53 54 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 52 def queues_in_pid workers_in_pid.collect { |w| w.queue }.compact end |
#quit ⇒ Object
236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 236 def quit if Rails.env =~ /development|test/ if RUBY_PLATFORM =~ /java/ #jruby doesn't trap the -QUIT signal #-TERM gracefully kills the main pid and does a -9 on the child if there is one. #Since jruby doesn't fork a child, the main worker is gracefully killed. system("kill -TERM #{self.pid}") else system("kill -QUIT #{self.pid}") end else system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:quit_worker pid=#{self.pid} host=#{self.ip} application_path=#{self.path}") end end |
#reconnect ⇒ Object
override so we can synchronize the client on the reconnect for multithreaded workers.
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 187 def reconnect tries = 0 begin redis.synchronize do |client| client.reconnect end rescue Redis::BaseConnectionError if (tries += 1) <= 3 log "Error reconnecting to Redis; retrying" sleep(tries) retry else log "Error reconnecting to Redis; quitting" raise end end end |
#restart ⇒ Object
267 268 269 270 271 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 267 def restart queues = self.queues_in_pid.join('#') quit self.class.start(hosts: self.ip, queues: queues, application_path: self.path) end |
#shutdown_with_multithreading ⇒ Object
Schedule this worker for shutdown. Will finish processing the current job. OVERRIDE for multithreaded workers
80 81 82 83 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 80 def shutdown_with_multithreading Thread.list.each { |t| t[:shutdown] = true } shutdown_without_multithreading end |
#shutdown_with_multithreading? ⇒ Boolean
180 181 182 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 180 def shutdown_with_multithreading? shutdown_without_multithreading? || Thread.current[:shutdown] end |
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle. OVERRIDE for multithreaded workers
63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 63 def startup enable_gc_optimizations if Thread.current == Thread.main register_signal_handlers prune_dead_workers end run_hook :before_first_fork register_worker # Fix buffering so we can `rake resque:work > resque.log` and # get output from the child in there. $stdout.sync = true end |
#thread ⇒ Object
32 33 34 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 32 def thread to_s.split(':').third end |
#to_s ⇒ Object Also known as: id
The string representation is the same as the id for this worker instance. Can be used with Worker.find.
16 17 18 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 16 def to_s @to_s || "#{hostname}(#{local_ip}):#{Process.pid}:#{Thread.current.object_id}:#{Thread.current[:path]}:#{Thread.current[:queues]}" end |
#unpause_processing_with_pause_key ⇒ Object
Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error
110 111 112 113 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 110 def unpause_processing_with_pause_key unpause_processing_without_pause_key Resque.redis.del(pause_key) end |
#unregister_worker_with_pause(exception = nil) ⇒ Object
Unregisters ourself as a worker. Useful when shutting down. OVERRIDE to also remove the pause key Would prefer to call super but get no superclass method error
140 141 142 143 144 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 140 def unregister_worker_with_pause(exception = nil) unregister_worker_without_pause(exception) Resque.redis.del(pause_key) end |
#work_with_multithreading(interval = 5.0, &block) ⇒ Object
This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.
The following events occur during a worker’s life cycle:
-
Startup: Signals are registered, dead workers are pruned,
and this worker is registered. -
Work loop: Jobs are pulled from a queue and processed.
-
Teardown: This worker is unregistered.
Can be passed an integer representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.
Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing. OVERRIDE for multithreaded workers
169 170 171 172 173 174 175 176 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 169 def work_with_multithreading(interval = 5.0, &block) work_without_multithreading(interval, &block) loop do #hang onto the process until all threads are done break if all_workers_in_pid_working.blank? sleep interval.to_i end end |
#workers_in_pid ⇒ Object
44 45 46 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 44 def workers_in_pid Array(Resque.redis.smembers(:workers)).select { |id| id =~ /\(#{ip}\):#{pid}/ }.map { |id| Resque::Worker.find(id) }.compact end |