Class: Resque::Worker
- Inherits:
-
Object
- Object
- Resque::Worker
- Defined in:
- lib/resque_manager/overrides/resque/worker.rb
Constant Summary collapse
- @@local_ip =
nil
Class Method Summary collapse
- .start(options) ⇒ Object
-
.working ⇒ Object
logic for mappged_mget changed where it returns keys with nil values in latest redis gem.
Instance Method Summary collapse
- #all_workers_in_pid_working ⇒ Object
- #continue ⇒ Object
- #ip ⇒ Object
- #local_ip ⇒ Object
- #overview_message ⇒ Object
- #overview_message=(message) ⇒ Object
- #path ⇒ Object
- #pause ⇒ Object
-
#pause_key ⇒ Object
When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion.
-
#pause_processing ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one).
- #paused ⇒ Object
-
#paused? ⇒ Boolean
are we paused? OVERRIDE so UI can tell if we’re paused.
- #pid ⇒ Object
-
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
- #queue ⇒ Object
-
#queues ⇒ Object
OVERRIDE for multithreaded workers.
- #queues_in_pid ⇒ Object
- #quit ⇒ Object
- #restart ⇒ Object
-
#shutdown ⇒ Object
Schedule this worker for shutdown.
-
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle.
- #thread ⇒ Object
-
#to_s ⇒ Object
(also: #id)
The string representation is the same as the id for this worker instance.
-
#unpause_processing ⇒ Object
Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error.
-
#unregister_worker_with_pause(exception = nil) ⇒ Object
Unregisters ourself as a worker.
-
#work(interval = 5.0, &block) ⇒ Object
This is the main workhorse method.
- #workers_in_pid ⇒ Object
Class Method Details
.start(options) ⇒ Object
242 243 244 245 246 247 248 249 250 251 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 242 def self.start() ips = [:hosts] application_path = [:application_path] queues = [:queues] if Rails.env =~ /development|test/ Thread.new(application_path, queues) { |application_path, queue| system("cd #{application_path || '.'}; bundle exec #{ResqueManager.resque_worker_rake || 'rake'} RAILS_ENV=#{Rails.env} QUEUE=#{queue} resque:work") } else Thread.new(ips, application_path, queues) { |ip_list, application_path, queue| system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:work host=#{ip_list} application_path=#{application_path} queue=#{queue}") } end end |
.working ⇒ Object
logic for mappged_mget changed where it returns keys with nil values in latest redis gem.
224 225 226 227 228 229 230 231 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 224 def self.working names = all return [] unless names.any? names.map! { |name| "worker:#{name}" } Resque.redis.mapped_mget(*names).map do |key, value| find key.sub("worker:", '') unless value.nil? end.compact end |
Instance Method Details
#all_workers_in_pid_working ⇒ Object
147 148 149 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 147 def all_workers_in_pid_working workers_in_pid.select { |w| (hash = w.processing) && !hash.empty? } end |
#continue ⇒ Object
276 277 278 279 280 281 282 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 276 def continue if Rails.env =~ /development|test/ system("kill -CONT #{self.pid}") else system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:continue_worker pid=#{self.pid} host=#{self.ip}") end end |
#ip ⇒ Object
50 51 52 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 50 def ip to_s.split(':').first[/\b(?:\d{1,3}\.){3}\d{1,3}\b/] end |
#local_ip ⇒ Object
7 8 9 10 11 12 13 14 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 7 def local_ip @@local_ip ||= begin UDPSocket.open do |s| s.connect 'google.com', 1 s.addr.last end end end |
#overview_message ⇒ Object
238 239 240 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 238 def job['overview_message'] end |
#overview_message=(message) ⇒ Object
233 234 235 236 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 233 def () data = encode(job.merge('overview_message' => )) Resque.redis.set("worker:#{self}", data) end |
#path ⇒ Object
38 39 40 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 38 def path to_s.split(':').fourth end |
#pause ⇒ Object
268 269 270 271 272 273 274 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 268 def pause if Rails.env =~ /development|test/ system("kill -USR2 #{self.pid}") else system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:pause_worker pid=#{self.pid} host=#{self.ip}") end end |
#pause_key ⇒ Object
When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion
25 26 27 28 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 25 def pause_key key = to_s.split(':') "worker:#{key.first}:#{key.second}:all_workers:paused" end |
#pause_processing ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one). OVERRIDE to set a redis key so UI knows it’s paused too Would prefer to call super but get no superclass method error
102 103 104 105 106 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 102 def pause_processing log "USR2 received; pausing job processing" @paused = true Resque.redis.set(pause_key, Time.now.to_s) end |
#paused ⇒ Object
88 89 90 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 88 def paused Resque.redis.get pause_key end |
#paused? ⇒ Boolean
are we paused? OVERRIDE so UI can tell if we’re paused
94 95 96 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 94 def paused? @paused || paused.present? end |
#pid ⇒ Object
30 31 32 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 30 def pid to_s.split(':').second end |
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefor will leave stale state information in Redis.
By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.
127 128 129 130 131 132 133 134 135 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 127 def prune_dead_workers Worker.all.each do |worker| host, pid, thread, path, queues = worker.id.split(':') next unless host.include?(hostname) next if worker_pids.include?(pid) log! "Pruning dead worker: #{worker}" worker.unregister_worker end end |
#queue ⇒ Object
42 43 44 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 42 def queue to_s.split(':').last end |
#queues ⇒ Object
OVERRIDE for multithreaded workers
59 60 61 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 59 def queues Thread.current[:queues] == "*" ? Resque.queues.sort : Thread.current[:queues].split(',') end |
#queues_in_pid ⇒ Object
54 55 56 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 54 def queues_in_pid workers_in_pid.collect { |w| w.queue } end |
#quit ⇒ Object
253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 253 def quit if Rails.env =~ /development|test/ if RUBY_PLATFORM =~ /java/ #jruby doesn't trap the -QUIT signal #-TERM gracefully kills the main pid and does a -9 on the child if there is one. #Since jruby doesn't fork a child, the main worker is gracefully killed. system("kill -TERM #{self.pid}") else system("kill -QUIT #{self.pid}") end else system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:quit_worker pid=#{self.pid} host=#{self.ip} application_path=#{self.path}") end end |
#restart ⇒ Object
284 285 286 287 288 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 284 def restart queues = self.queues_in_pid.join('#') quit self.class.start(hosts: self.ip, queues: queues, application_path: self.path) end |
#shutdown ⇒ Object
Schedule this worker for shutdown. Will finish processing the current job. OVERRIDE for multithreaded workers
82 83 84 85 86 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 82 def shutdown log 'Exiting...' Thread.list.each { |t| t[:shutdown] = true } @shutdown = true end |
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle. OVERRIDE for multithreaded workers
65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 65 def startup enable_gc_optimizations if Thread.current == Thread.main register_signal_handlers prune_dead_workers end run_hook :before_first_fork register_worker # Fix buffering so we can `rake resque:work > resque.log` and # get output from the child in there. $stdout.sync = true end |
#thread ⇒ Object
34 35 36 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 34 def thread to_s.split(':').third end |
#to_s ⇒ Object Also known as: id
The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.
18 19 20 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 18 def to_s @to_s || "#{hostname}(#{local_ip}):#{Process.pid}:#{Thread.current.object_id}:#{Thread.current[:path]}:#{Thread.current[:queues]}" end |
#unpause_processing ⇒ Object
Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error
111 112 113 114 115 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 111 def unpause_processing log "CONT received; resuming job processing" @paused = false Resque.redis.del(pause_key) end |
#unregister_worker_with_pause(exception = nil) ⇒ Object
Unregisters ourself as a worker. Useful when shutting down. OVERRIDE to also remove the pause key Would prefer to call super but get no superclass method error
140 141 142 143 144 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 140 def unregister_worker_with_pause(exception = nil) unregister_worker_without_pause(exception) Resque.redis.del(pause_key) end |
#work(interval = 5.0, &block) ⇒ Object
This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.
The following events occur during a worker’s life cycle:
-
Startup: Signals are registered, dead workers are pruned,
and this worker is registered. -
Work loop: Jobs are pulled from a queue and processed.
-
Teardown: This worker is unregistered.
Can be passed an integer representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.
Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing. OVERRIDE for multithreaded workers
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 168 def work(interval = 5.0, &block) interval = Float(interval) $0 = "resque: Starting" startup loop do break if shutdown? || Thread.current[:shutdown] if not paused? and job = reserve log "got: #{job.inspect}" job.worker = self working_on job procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class}]" if @child = fork(job) do unregister_signal_handlers if term_child reconnect perform(job, &block) exit! unless run_at_exit_hooks end srand # Reseeding procline "Forked #{@child} at #{Time.now.to_i}" begin Process.waitpid(@child) rescue SystemCallError nil end job.fail(DirtyExit.new($?.to_s)) if $?.signaled? else reconnect perform(job, &block) end done_working @child = nil else break if interval.zero? log! "Sleeping for #{interval} seconds" procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}" sleep interval end end unregister_worker loop do #hang onto the process until all threads are done break if all_workers_in_pid_working.blank? sleep interval.to_i end rescue Exception => exception log "Failed to start worker : #{exception.inspect}" unregister_worker(exception) end |
#workers_in_pid ⇒ Object
46 47 48 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 46 def workers_in_pid Array(Resque.redis.smembers(:workers)).select { |id| id =~ /\(#{ip}\):#{pid}/ }.map { |id| Resque::Worker.find(id) }.compact end |