Class: Resque::Worker
- Inherits:
-
Object
- Object
- Resque::Worker
- Defined in:
- lib/resque_manager/overrides/resque/worker.rb
Constant Summary collapse
- @@local_ip =
nil
Class Method Summary collapse
- .start(options) ⇒ Object
-
.working ⇒ Object
logic for mappged_mget changed where it returns keys with nil values in latest redis gem.
Instance Method Summary collapse
- #all_workers_in_pid_working ⇒ Object
- #continue ⇒ Object
- #ip ⇒ Object
- #local_ip ⇒ Object
- #overview_message ⇒ Object
- #overview_message=(message) ⇒ Object
- #path ⇒ Object
- #pause ⇒ Object
-
#pause_key ⇒ Object
When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion.
-
#pause_processing ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one).
- #paused ⇒ Object
-
#paused? ⇒ Boolean
are we paused? OVERRIDE so UI can tell if we’re paused.
- #pid ⇒ Object
-
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
- #queue ⇒ Object
-
#queues ⇒ Object
OVERRIDE for multithreaded workers.
- #queues_in_pid ⇒ Object
- #quit ⇒ Object
- #restart ⇒ Object
-
#shutdown ⇒ Object
Schedule this worker for shutdown.
-
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle.
- #thread ⇒ Object
-
#to_s ⇒ Object
(also: #id)
The string representation is the same as the id for this worker instance.
-
#unpause_processing ⇒ Object
Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error.
-
#unregister_worker_with_pause(exception = nil) ⇒ Object
Unregisters ourself as a worker.
-
#work(interval = 5.0, &block) ⇒ Object
This is the main workhorse method.
- #workers_in_pid ⇒ Object
Class Method Details
.start(options) ⇒ Object
236 237 238 239 240 241 242 243 244 245 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 236 def self.start() ips = [:hosts] application_path = [:application_path] queues = [:queues] if Rails.env =~ /development|test/ Thread.new(application_path, queues) { |application_path, queue| system("cd #{application_path || '.'}; bundle exec #{ResqueManager.resque_worker_rake || 'rake'} RAILS_ENV=#{Rails.env} QUEUE=#{queue} resque:work") } else Thread.new(ips, application_path, queues) { |ip_list, application_path, queue| system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:work host=#{ip_list} application_path=#{application_path} queue=#{queue}") } end end |
.working ⇒ Object
logic for mappged_mget changed where it returns keys with nil values in latest redis gem.
218 219 220 221 222 223 224 225 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 218 def self.working names = all return [] unless names.any? names.map! { |name| "worker:#{name}" } Resque.redis.mapped_mget(*names).map do |key, value| find key.sub("worker:", '') unless value.nil? end.compact end |
Instance Method Details
#all_workers_in_pid_working ⇒ Object
142 143 144 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 142 def all_workers_in_pid_working workers_in_pid.select { |w| (hash = w.processing) && !hash.empty? } end |
#continue ⇒ Object
270 271 272 273 274 275 276 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 270 def continue if Rails.env =~ /development|test/ system("kill -CONT #{self.pid}") else system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:continue_worker pid=#{self.pid} host=#{self.ip}") end end |
#ip ⇒ Object
45 46 47 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 45 def ip to_s.split(':').first[/\b(?:\d{1,3}\.){3}\d{1,3}\b/] end |
#local_ip ⇒ Object
7 8 9 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 7 def local_ip @@local_ip ||= IPSocket.getaddress(Socket.gethostname) end |
#overview_message ⇒ Object
232 233 234 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 232 def job['overview_message'] end |
#overview_message=(message) ⇒ Object
227 228 229 230 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 227 def () data = encode(job.merge('overview_message' => )) Resque.redis.set("worker:#{self}", data) end |
#path ⇒ Object
33 34 35 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 33 def path to_s.split(':').fourth end |
#pause ⇒ Object
262 263 264 265 266 267 268 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 262 def pause if Rails.env =~ /development|test/ system("kill -USR2 #{self.pid}") else system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:pause_worker pid=#{self.pid} host=#{self.ip}") end end |
#pause_key ⇒ Object
When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion
20 21 22 23 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 20 def pause_key key = to_s.split(':') "worker:#{key.first}:#{key.second}:all_workers:paused" end |
#pause_processing ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one). OVERRIDE to set a redis key so UI knows it’s paused too Would prefer to call super but get no superclass method error
97 98 99 100 101 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 97 def pause_processing log 'USR2 received; pausing job processing' @paused = true Resque.redis.set(pause_key, Time.now.to_s) end |
#paused ⇒ Object
83 84 85 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 83 def paused Resque.redis.get pause_key end |
#paused? ⇒ Boolean
are we paused? OVERRIDE so UI can tell if we’re paused
89 90 91 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 89 def paused? @paused || paused.present? end |
#pid ⇒ Object
25 26 27 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 25 def pid to_s.split(':').second end |
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefor will leave stale state information in Redis.
By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.
122 123 124 125 126 127 128 129 130 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 122 def prune_dead_workers Worker.all.each do |worker| host, pid, thread, path, queues = worker.id.split(':') next unless host.include?(hostname) next if worker_pids.include?(pid) log! "Pruning dead worker: #{worker}" worker.unregister_worker end end |
#queue ⇒ Object
37 38 39 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 37 def queue to_s.split(':').fifth end |
#queues ⇒ Object
OVERRIDE for multithreaded workers
54 55 56 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 54 def queues Thread.current[:queues] == "*" ? Resque.queues.sort : Thread.current[:queues].split(',') end |
#queues_in_pid ⇒ Object
49 50 51 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 49 def queues_in_pid workers_in_pid.collect { |w| w.queue }.compact end |
#quit ⇒ Object
247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 247 def quit if Rails.env =~ /development|test/ if RUBY_PLATFORM =~ /java/ #jruby doesn't trap the -QUIT signal #-TERM gracefully kills the main pid and does a -9 on the child if there is one. #Since jruby doesn't fork a child, the main worker is gracefully killed. system("kill -TERM #{self.pid}") else system("kill -QUIT #{self.pid}") end else system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:quit_worker pid=#{self.pid} host=#{self.ip} application_path=#{self.path}") end end |
#restart ⇒ Object
278 279 280 281 282 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 278 def restart queues = self.queues_in_pid.join('#') quit self.class.start(hosts: self.ip, queues: queues, application_path: self.path) end |
#shutdown ⇒ Object
Schedule this worker for shutdown. Will finish processing the current job. OVERRIDE for multithreaded workers
77 78 79 80 81 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 77 def shutdown log 'Exiting...' Thread.list.each { |t| t[:shutdown] = true } @shutdown = true end |
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle. OVERRIDE for multithreaded workers
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 60 def startup enable_gc_optimizations if Thread.current == Thread.main register_signal_handlers prune_dead_workers end run_hook :before_first_fork register_worker # Fix buffering so we can `rake resque:work > resque.log` and # get output from the child in there. $stdout.sync = true end |
#thread ⇒ Object
29 30 31 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 29 def thread to_s.split(':').third end |
#to_s ⇒ Object Also known as: id
The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.
13 14 15 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 13 def to_s @to_s || "#{hostname}(#{local_ip}):#{Process.pid}:#{Thread.current.object_id}:#{Thread.current[:path]}:#{Thread.current[:queues]}" end |
#unpause_processing ⇒ Object
Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error
106 107 108 109 110 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 106 def unpause_processing log 'CONT received; resuming job processing' @paused = false Resque.redis.del(pause_key) end |
#unregister_worker_with_pause(exception = nil) ⇒ Object
Unregisters ourself as a worker. Useful when shutting down. OVERRIDE to also remove the pause key Would prefer to call super but get no superclass method error
135 136 137 138 139 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 135 def unregister_worker_with_pause(exception = nil) unregister_worker_without_pause(exception) Resque.redis.del(pause_key) end |
#work(interval = 5.0, &block) ⇒ Object
This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.
The following events occur during a worker’s life cycle:
-
Startup: Signals are registered, dead workers are pruned,
and this worker is registered. -
Work loop: Jobs are pulled from a queue and processed.
-
Teardown: This worker is unregistered.
Can be passed an integer representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.
Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing. OVERRIDE for multithreaded workers
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 163 def work(interval = 5.0, &block) interval = Float(interval) $0 = 'resque: Starting' startup loop do break if shutdown? || Thread.current[:shutdown] if not paused? and job = reserve log "got: #{job.inspect}" job.worker = self working_on job procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class}]" if @child = fork(job) do unregister_signal_handlers if term_child reconnect perform(job, &block) exit! unless run_at_exit_hooks end srand # Reseeding procline "Forked #{@child} at #{Time.now.to_i}" begin Process.waitpid(@child) rescue SystemCallError nil end job.fail(DirtyExit.new($?.to_s)) if $?.signaled? else reconnect perform(job, &block) end done_working @child = nil else break if interval.zero? log! "Sleeping for #{interval} seconds" procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}" sleep interval end end unregister_worker loop do #hang onto the process until all threads are done break if all_workers_in_pid_working.blank? sleep interval.to_i end rescue Exception => exception log "Failed to start worker : #{exception.inspect}" unregister_worker(exception) end |
#workers_in_pid ⇒ Object
41 42 43 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 41 def workers_in_pid Array(Resque.redis.smembers(:workers)).select { |id| id =~ /\(#{ip}\):#{pid}/ }.map { |id| Resque::Worker.find(id) }.compact end |