Class: Resqued::Master
- Inherits:
-
Object
- Object
- Resqued::Master
- Includes:
- Logging, Pidfile, ProclineVersion, Sleepy
- Defined in:
- lib/resqued/master.rb
Overview
The master process.
-
Spawns a listener.
-
Tracks all work. (IO pipe from listener.)
-
Handles signals.
Constant Summary collapse
- SIGNALS =
[ :HUP, :INT, :USR2, :CONT, :TERM, :QUIT ]
- OPTIONAL_SIGNALS =
[ :INFO ]
- OTHER_SIGNALS =
[:CHLD, 'EXIT']
- TRAPS =
SIGNALS + OPTIONAL_SIGNALS + OTHER_SIGNALS
- SIGNAL_QUEUE =
[]
Instance Method Summary collapse
-
#all_listeners ⇒ Object
Private: All the ListenerProxy objects.
-
#dump_object_counts ⇒ Object
Private.
-
#go_ham ⇒ Object
Private: dat main loop.
-
#initialize(options) ⇒ Master
constructor
A new instance of Master.
- #install_signal_handlers ⇒ Object
- #kill_all_listeners(signal) ⇒ Object
- #kill_listener(signal, listener) ⇒ Object
-
#listener_pids ⇒ Object
Private: Map listener pids to ListenerProxy objects.
-
#listener_running(listener) ⇒ Object
Listener message: A listener finished booting, and is ready to start workers.
- #listener_status(listener, status) ⇒ Object
- #next_listener_id ⇒ Object
- #no_more_unexpected_exits ⇒ Object
-
#prepare_new_listener ⇒ Object
Private: Spin up a new listener.
- #read_listeners ⇒ Object
- #reap_all_listeners(waitpid_flags = 0) ⇒ Object
- #report_unexpected_exits ⇒ Object
-
#run(ready_pipe = nil) ⇒ Object
Public: Starts the master process.
- #start_listener ⇒ Object
- #status_message(type, pid, status) ⇒ Object
- #wait_for_workers ⇒ Object
-
#worker_finished(pid) ⇒ Object
Listener message: A worker just stopped working.
-
#worker_started(pid) ⇒ Object
Listener message: A worker just started working.
- #worker_status(pid, status) ⇒ Object
- #write_procline ⇒ Object
- #yawn(duration) ⇒ Object
Methods included from Sleepy
Methods included from ProclineVersion
Methods included from Pidfile
#remove_pidfile, #with_pidfile, #write_pidfile
Methods included from Logging
build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs
Constructor Details
#initialize(options) ⇒ Master
Returns a new instance of Master.
19 20 21 22 23 24 25 |
# File 'lib/resqued/master.rb', line 19 def initialize() @config_paths = .fetch(:config_paths) @pidfile = .fetch(:master_pidfile) { nil } @status_pipe = .fetch(:status_pipe) { nil } @listener_backoff = Backoff.new @listeners_created = 0 end |
Instance Method Details
#all_listeners ⇒ Object
Private: All the ListenerProxy objects.
108 109 110 |
# File 'lib/resqued/master.rb', line 108 def all_listeners listener_pids.values end |
#dump_object_counts ⇒ Object
Private.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/resqued/master.rb', line 76 def dump_object_counts log GC.stat.inspect counts = {} total = 0 ObjectSpace.each_object do |o| count = counts[o.class.name] || 0 counts[o.class.name] = count + 1 total += 1 end top = 10 log "#{total} objects. top #{top}:" counts.sort_by { |name, count| count }.reverse.each_with_index do |(name, count), i| if i < top diff = "" if last = @last_counts && @last_counts[name] diff = " (#{'%+d' % (count - last)})" end log " #{count} #{name}#{diff}" end end @last_counts = counts log GC.stat.inspect rescue => e log "Error while counting objects: #{e}" end |
#go_ham ⇒ Object
Private: dat main loop.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/resqued/master.rb', line 43 def go_ham loop do read_listeners reap_all_listeners(Process::WNOHANG) start_listener unless @paused case signal = SIGNAL_QUEUE.shift when nil yawn(@listener_backoff.how_long? || 30.0) when :INFO dump_object_counts when :HUP reopen_logs log "Restarting listener with new configuration and application." prepare_new_listener when :USR2 log "Pause job processing" @paused = true kill_listener(:QUIT, @current_listener) @current_listener = nil when :CONT log "Resume job processing" @paused = false kill_all_listeners(:CONT) when :INT, :TERM, :QUIT log "Shutting down..." kill_all_listeners(signal) wait_for_workers break end end end |
#install_signal_handlers ⇒ Object
220 221 222 223 224 |
# File 'lib/resqued/master.rb', line 220 def install_signal_handlers trap(:CHLD) { awake } SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal ; awake } } OPTIONAL_SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal ; awake } rescue nil } end |
#kill_all_listeners(signal) ⇒ Object
182 183 184 185 186 |
# File 'lib/resqued/master.rb', line 182 def kill_all_listeners(signal) all_listeners.each do |l| l.kill(signal) end end |
#kill_listener(signal, listener) ⇒ Object
178 179 180 |
# File 'lib/resqued/master.rb', line 178 def kill_listener(signal, listener) listener.kill(signal) if listener end |
#listener_pids ⇒ Object
Private: Map listener pids to ListenerProxy objects.
103 104 105 |
# File 'lib/resqued/master.rb', line 103 def listener_pids @listener_pids ||= {} end |
#listener_running(listener) ⇒ Object
Listener message: A listener finished booting, and is ready to start workers.
Promotes a booting listener to be the current listener.
150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/resqued/master.rb', line 150 def listener_running(listener) listener_status(listener, 'ready') if listener == @current_listener kill_listener(:QUIT, @last_good_listener) @last_good_listener = nil else # This listener didn't receive the last SIGQUIT we sent. # (It was probably sent before the listener had set up its traps.) # So kill it again. We have moved on. kill_listener(:QUIT, listener) end end |
#listener_status(listener, status) ⇒ Object
249 250 251 252 253 |
# File 'lib/resqued/master.rb', line 249 def listener_status(listener, status) if listener && listener.pid ('listener', listener.pid, status) end end |
#next_listener_id ⇒ Object
122 123 124 |
# File 'lib/resqued/master.rb', line 122 def next_listener_id @listeners_created += 1 end |
#no_more_unexpected_exits ⇒ Object
237 238 239 |
# File 'lib/resqued/master.rb', line 237 def no_more_unexpected_exits trap('EXIT', 'DEFAULT') end |
#prepare_new_listener ⇒ Object
Private: Spin up a new listener.
The old one will be killed when the new one is ready for workers.
166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/resqued/master.rb', line 166 def prepare_new_listener if @last_good_listener # The last_good_listener is still running because we got another HUP before the new listener finished booting. # Keep the last_good_listener (where all the workers are) and kill the booting current_listener. We'll start a new one. kill_listener(:QUIT, @current_listener) else @last_good_listener = @current_listener end # Indicate to `start_listener` that it should start a new listener. @current_listener = nil end |
#read_listeners ⇒ Object
126 127 128 129 130 |
# File 'lib/resqued/master.rb', line 126 def read_listeners all_listeners.each do |l| l.read_worker_status(:on_activity => self) end end |
#reap_all_listeners(waitpid_flags = 0) ⇒ Object
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/resqued/master.rb', line 192 def reap_all_listeners(waitpid_flags = 0) begin lpid, status = Process.waitpid2(-1, waitpid_flags) if lpid log "Listener exited #{status}" if @current_listener && @current_listener.pid == lpid @listener_backoff.died @current_listener = nil end dead_listener = listener_pids.delete(lpid) listener_status dead_listener, 'stop' dead_listener.dispose write_procline else return end rescue Errno::ECHILD return end while true end |
#report_unexpected_exits ⇒ Object
226 227 228 229 230 231 232 233 234 235 |
# File 'lib/resqued/master.rb', line 226 def report_unexpected_exits trap('EXIT') do log("EXIT #{$!.inspect}") if $! $!.backtrace.each do |line| log(line) end end end end |
#run(ready_pipe = nil) ⇒ Object
Public: Starts the master process.
28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/resqued/master.rb', line 28 def run(ready_pipe = nil) report_unexpected_exits with_pidfile(@pidfile) do write_procline install_signal_handlers if ready_pipe ready_pipe.syswrite($$.to_s) ready_pipe.close rescue nil end go_ham end no_more_unexpected_exits end |
#start_listener ⇒ Object
112 113 114 115 116 117 118 119 120 |
# File 'lib/resqued/master.rb', line 112 def start_listener return if @current_listener || @listener_backoff.wait? @current_listener = ListenerProxy.new(:config_paths => @config_paths, :old_workers => all_listeners.map { |l| l.running_workers }.flatten, :listener_id => next_listener_id) @current_listener.run listener_status @current_listener, 'start' @listener_backoff.started listener_pids[@current_listener.pid] = @current_listener write_procline end |
#status_message(type, pid, status) ⇒ Object
259 260 261 262 263 |
# File 'lib/resqued/master.rb', line 259 def (type, pid, status) if @status_pipe @status_pipe.write("#{type},#{pid},#{status}\n") end end |
#wait_for_workers ⇒ Object
188 189 190 |
# File 'lib/resqued/master.rb', line 188 def wait_for_workers reap_all_listeners end |
#worker_finished(pid) ⇒ Object
Listener message: A worker just stopped working.
Forwards the message to the other listeners.
140 141 142 143 144 145 |
# File 'lib/resqued/master.rb', line 140 def worker_finished(pid) worker_status(pid, 'stop') all_listeners.each do |other| other.worker_finished(pid) end end |
#worker_started(pid) ⇒ Object
Listener message: A worker just started working.
133 134 135 |
# File 'lib/resqued/master.rb', line 133 def worker_started(pid) worker_status(pid, 'start') end |
#worker_status(pid, status) ⇒ Object
255 256 257 |
# File 'lib/resqued/master.rb', line 255 def worker_status(pid, status) ('worker', pid, status) end |
#write_procline ⇒ Object
245 246 247 |
# File 'lib/resqued/master.rb', line 245 def write_procline $0 = "#{procline_version} master [gen #{@listeners_created}] [#{listener_pids.size} running] #{ARGV.join(' ')}" end |
#yawn(duration) ⇒ Object
241 242 243 |
# File 'lib/resqued/master.rb', line 241 def yawn(duration) super(duration, all_listeners.map { |l| l.read_pipe }) end |