Class: Resque::Worker
- Inherits:
-
Object
- Object
- Resque::Worker
- Includes:
- SemanticLogger::Loggable
- Defined in:
- lib/resque_manager/overrides/resque/worker.rb
Constant Summary collapse
- @@local_ip =
nil
Class Method Summary collapse
- .start(options) ⇒ Object
-
.working ⇒ Object
logic for mappged_mget changed where it returns keys with nil values in latest redis gem.
Instance Method Summary collapse
- #all_workers_in_pid_working ⇒ Object
- #continue ⇒ Object
- #ip ⇒ Object
- #local_ip ⇒ Object
- #overview_message ⇒ Object
- #overview_message=(message) ⇒ Object
- #path ⇒ Object
- #pause ⇒ Object
-
#pause_key ⇒ Object
When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion.
-
#pause_processing_with_pause_key ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one).
- #paused ⇒ Object
-
#paused? ⇒ Boolean
are we paused? OVERRIDE so UI can tell if we’re paused.
- #pid ⇒ Object
-
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
- #queue ⇒ Object
-
#queues ⇒ Object
OVERRIDE for multithreaded workers.
- #queues_in_pid ⇒ Object
- #quit ⇒ Object
- #restart ⇒ Object
-
#shutdown_with_multithreading ⇒ Object
Schedule this worker for shutdown.
- #shutdown_with_multithreading? ⇒ Boolean
-
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle.
- #thread ⇒ Object
-
#to_s ⇒ Object
(also: #id)
The string representation is the same as the id for this worker instance.
-
#unpause_processing_with_pause_key ⇒ Object
Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error.
-
#unregister_worker_with_pause(exception = nil) ⇒ Object
Unregisters ourself as a worker.
-
#work_with_multithreading(interval = 5.0, &block) ⇒ Object
This is the main workhorse method.
- #workers_in_pid ⇒ Object
Class Method Details
.start(options) ⇒ Object
199 200 201 202 203 204 205 206 207 208 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 199 def self.start() ips = [:hosts] application_path = [:application_path] queues = [:queues] if Rails.env =~ /development|test/ Thread.new(application_path, queues) { |application_path, queue| system("cd #{application_path || '.'} && bundle exec #{ResqueManager.resque_worker_rake || 'rake'} RAILS_ENV=#{Rails.env} QUEUE=#{queue} resque:work") } else Thread.new(ips, application_path, queues) { |ip_list, application_path, queue| system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:work host=#{ip_list} application_path=#{application_path} queue=#{queue}") } end end |
.working ⇒ Object
logic for mappged_mget changed where it returns keys with nil values in latest redis gem.
181 182 183 184 185 186 187 188 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 181 def self.working names = all return [] unless names.any? names.map! { |name| "worker:#{name}" } Resque.redis.mapped_mget(*names).map do |key, value| find key.sub("worker:", '') unless value.nil? end.compact end |
Instance Method Details
#all_workers_in_pid_working ⇒ Object
144 145 146 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 144 def all_workers_in_pid_working workers_in_pid.select { |w| (hash = w.processing) && !hash.empty? } end |
#continue ⇒ Object
233 234 235 236 237 238 239 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 233 def continue if Rails.env =~ /development|test/ system("kill -CONT #{self.pid}") else system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:continue_worker pid=#{self.pid} host=#{self.ip}") end end |
#ip ⇒ Object
48 49 50 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 48 def ip to_s.split(':').first[/\b(?:\d{1,3}\.){3}\d{1,3}\b/] end |
#local_ip ⇒ Object
10 11 12 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 10 def local_ip @@local_ip ||= IPSocket.getaddress(Socket.gethostname) end |
#overview_message ⇒ Object
195 196 197 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 195 def job['overview_message'] end |
#overview_message=(message) ⇒ Object
190 191 192 193 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 190 def () data = encode(job.merge('overview_message' => )) Resque.redis.set("worker:#{self}", data) end |
#path ⇒ Object
36 37 38 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 36 def path to_s.split(':').fourth end |
#pause ⇒ Object
225 226 227 228 229 230 231 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 225 def pause if Rails.env =~ /development|test/ system("kill -USR2 #{self.pid}") else system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:pause_worker pid=#{self.pid} host=#{self.ip}") end end |
#pause_key ⇒ Object
When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion
23 24 25 26 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 23 def pause_key key = to_s.split(':') "worker:#{key.first}:#{key.second}:all_workers:paused" end |
#pause_processing_with_pause_key ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one). OVERRIDE to set a redis key so UI knows it’s paused too
99 100 101 102 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 99 def pause_processing_with_pause_key pause_processing_without_pause_key Resque.redis.set(pause_key, Time.now.to_s) end |
#paused ⇒ Object
86 87 88 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 86 def paused Resque.redis.get pause_key end |
#paused? ⇒ Boolean
are we paused? OVERRIDE so UI can tell if we’re paused
92 93 94 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 92 def paused? @paused || paused.present? end |
#pid ⇒ Object
28 29 30 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 28 def pid to_s.split(':').second end |
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefore will leave stale state information in Redis.
By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.
124 125 126 127 128 129 130 131 132 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 124 def prune_dead_workers Worker.all.each do |worker| host, pid, thread, path, queues = worker.id.split(':') next unless host.include?(hostname) next if worker_pids.include?(pid) log! "Pruning dead worker: #{worker}" worker.unregister_worker end end |
#queue ⇒ Object
40 41 42 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 40 def queue to_s.split(':').fifth end |
#queues ⇒ Object
OVERRIDE for multithreaded workers
57 58 59 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 57 def queues Thread.current[:queues] == "*" ? Resque.queues.sort : Thread.current[:queues].split(',') end |
#queues_in_pid ⇒ Object
52 53 54 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 52 def queues_in_pid workers_in_pid.collect { |w| w.queue }.compact end |
#quit ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 210 def quit if Rails.env =~ /development|test/ if RUBY_PLATFORM =~ /java/ #jruby doesn't trap the -QUIT signal #-TERM gracefully kills the main pid and does a -9 on the child if there is one. #Since jruby doesn't fork a child, the main worker is gracefully killed. system("kill -TERM #{self.pid}") else system("kill -QUIT #{self.pid}") end else system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:quit_worker pid=#{self.pid} host=#{self.ip} application_path=#{self.path}") end end |
#restart ⇒ Object
241 242 243 244 245 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 241 def restart queues = self.queues_in_pid.join('#') quit self.class.start(hosts: self.ip, queues: queues, application_path: self.path) end |
#shutdown_with_multithreading ⇒ Object
Schedule this worker for shutdown. Will finish processing the current job. OVERRIDE for multithreaded workers
80 81 82 83 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 80 def shutdown_with_multithreading Thread.list.each { |t| t[:shutdown] = true } shutdown_without_multithreading end |
#shutdown_with_multithreading? ⇒ Boolean
175 176 177 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 175 def shutdown_with_multithreading? shutdown_without_multithreading? || Thread.current[:shutdown] end |
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle. OVERRIDE for multithreaded workers
63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 63 def startup enable_gc_optimizations if Thread.current == Thread.main register_signal_handlers prune_dead_workers end run_hook :before_first_fork register_worker # Fix buffering so we can `rake resque:work > resque.log` and # get output from the child in there. $stdout.sync = true end |
#thread ⇒ Object
32 33 34 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 32 def thread to_s.split(':').third end |
#to_s ⇒ Object Also known as: id
The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.
16 17 18 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 16 def to_s @to_s || "#{hostname}(#{local_ip}):#{Process.pid}:#{Thread.current.object_id}:#{Thread.current[:path]}:#{Thread.current[:queues]}" end |
#unpause_processing_with_pause_key ⇒ Object
Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error
108 109 110 111 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 108 def unpause_processing_with_pause_key unpause_processing_without_pause_key Resque.redis.del(pause_key) end |
#unregister_worker_with_pause(exception = nil) ⇒ Object
Unregisters ourself as a worker. Useful when shutting down. OVERRIDE to also remove the pause key Would prefer to call super but get no superclass method error
137 138 139 140 141 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 137 def unregister_worker_with_pause(exception = nil) unregister_worker_without_pause(exception) Resque.redis.del(pause_key) end |
#work_with_multithreading(interval = 5.0, &block) ⇒ Object
This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.
The following events occur during a worker’s life cycle:
-
Startup: Signals are registered, dead workers are pruned,
and this worker is registered.
-
Work loop: Jobs are pulled from a queue and processed.
-
Teardown: This worker is unregistered.
Can be passed an integer representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.
Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing. OVERRIDE for multithreaded workers
165 166 167 168 169 170 171 172 |
# File 'lib/resque_manager/overrides/resque/worker.rb', line 165 def work_with_multithreading(interval = 5.0, &block) work_without_multithreading(interval, &block) loop do #hang onto the process until all threads are done break if all_workers_in_pid_working.blank? sleep interval.to_i end end |