Class: Divvy::Master
- Inherits:
-
Object
- Object
- Divvy::Master
- Defined in:
- lib/divvy/master.rb
Overview
The master process used to generate and distribute task items to the worker processes.
Defined Under Namespace
Classes: BootFailure, Shutdown
Instance Attribute Summary collapse
-
#failures ⇒ Object
readonly
Number of worker processes that have exited with a failure status since the master started processing work.
-
#socket ⇒ Object
readonly
The string filename of the unix domain socket used to distribute work.
-
#spawn_count ⇒ Object
readonly
Number of worker processes that have been spawned since the master started processing work.
-
#tasks_distributed ⇒ Object
readonly
Number of tasks that have been distributed to worker processes.
-
#verbose ⇒ Object
Enable verbose logging to stderr.
-
#worker_count ⇒ Object
readonly
The number of worker processes to boot.
-
#workers ⇒ Object
readonly
The array of Worker objects this master is managing.
Instance Method Summary collapse
-
#boot_worker(worker) ⇒ Object
Internal: Boot and individual worker process.
-
#boot_workers ⇒ Object
Internal: Boot any workers that are not currently running.
-
#initialize(task, worker_count, verbose = false, socket = nil) ⇒ Master
constructor
Create the master process object.
-
#install_signal_traps ⇒ Object
Internal: Install traps for shutdown signals.
-
#kill_workers(signal = 'TERM') ⇒ Object
Internal: Send a signal to all running workers.
-
#log(message) ⇒ Object
Internal: Write a verbose log message to stderr.
-
#master_process? ⇒ Boolean
Public: Check if the current process is the master process.
-
#reap_workers ⇒ Object
Internal: Attempt to reap all worker processes via Process::waitpid.
-
#reset_signal_traps ⇒ Object
Internal: Uninstall signal traps set up by the install_signal_traps method.
-
#run ⇒ Object
Public: Start the main run loop.
-
#shutdown ⇒ Object
Public: Initiate shutdown of the run loop.
-
#shutdown! ⇒ Object
Internal: Really shutdown the unix socket and reap all worker processes.
-
#start_server ⇒ Object
Internal: create and bind to the unix domain socket.
-
#stop_server ⇒ Object
Internal: Close and remove the unix domain socket.
-
#worker_process? ⇒ Boolean
Public: Check if the current process is a worker process.
-
#workers_running? ⇒ Boolean
Public: Are any worker processes currently running or have yet to be reaped by the master process?.
Constructor Details
#initialize(task, worker_count, verbose = false, socket = nil) ⇒ Master
Create the master process object.
task - Object that implements the Parallelizable interface. worker_count - Number of worker processes. verbose - Enable verbose error logging. socket - The unix domain socket filename.
The workers array is initialized with Worker objects for the number of worker processes requested. The processes are not actually started at this time though.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/divvy/master.rb', line 49 def initialize(task, worker_count, verbose = false, socket = nil) @task = task @worker_count = worker_count @verbose = verbose @socket = socket || "/tmp/divvy-#{$$}-#{object_id}.sock" # stats @tasks_distributed = 0 @failures = 0 @spawn_count = 0 # shutdown state @shutdown = false @graceful = true @reap = false # worker objects @workers = [] (1..@worker_count).each do |worker_num| worker = Divvy::Worker.new(@task, worker_num, @socket, @verbose) workers << worker end end |
Instance Attribute Details
#failures ⇒ Object (readonly)
Number of worker processes that have exited with a failure status since the master started processing work.
24 25 26 |
# File 'lib/divvy/master.rb', line 24 def failures @failures end |
#socket ⇒ Object (readonly)
The string filename of the unix domain socket used to distribute work.
14 15 16 |
# File 'lib/divvy/master.rb', line 14 def socket @socket end |
#spawn_count ⇒ Object (readonly)
Number of worker processes that have been spawned since the master started processing work.
28 29 30 |
# File 'lib/divvy/master.rb', line 28 def spawn_count @spawn_count end |
#tasks_distributed ⇒ Object (readonly)
Number of tasks that have been distributed to worker processes.
20 21 22 |
# File 'lib/divvy/master.rb', line 20 def tasks_distributed @tasks_distributed end |
#verbose ⇒ Object
Enable verbose logging to stderr.
17 18 19 |
# File 'lib/divvy/master.rb', line 17 def verbose @verbose end |
#worker_count ⇒ Object (readonly)
The number of worker processes to boot.
8 9 10 |
# File 'lib/divvy/master.rb', line 8 def worker_count @worker_count end |
#workers ⇒ Object (readonly)
The array of Worker objects this master is managing.
11 12 13 |
# File 'lib/divvy/master.rb', line 11 def workers @workers end |
Instance Method Details
#boot_worker(worker) ⇒ Object
Internal: Boot and individual worker process. Don’t call this if the worker is thought to be running.
worker - The Worker object to boot.
Returns the Worker object provided.
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/divvy/master.rb', line 212 def boot_worker(worker) fail "worker #{worker.number} already running" if worker.running? fail "attempt to boot worker without server" if !@server @task.before_fork(worker) worker.spawn do reset_signal_traps @workers = nil @server.close @server = nil $stdin.close end @spawn_count += 1 worker end |
#boot_workers ⇒ Object
Internal: Boot any workers that are not currently running. This is a no-op if all workers are though to be running. No attempt is made to verify worker processes are running here. Only workers that have not yet been booted and those previously marked as reaped are started.
199 200 201 202 203 204 |
# File 'lib/divvy/master.rb', line 199 def boot_workers workers.each do |worker| next if worker.running? boot_worker(worker) end end |
#install_signal_traps ⇒ Object
Internal: Install traps for shutdown signals. Most signals deal with shutting down the master loop and socket.
INFO - Dump stack for all processes to stderr. TERM - Initiate immediate forceful shutdown of all worker processes
along with the master process, aborting any existing jobs in
progress.
INT, QUIT - Initiate graceful shutdown, allowing existing worker processes
to finish their current task and exit on their own. If this
signal is received again after 10s, instead initiate an
immediate forceful shutdown as with TERM. This is mostly so you
can interrupt sanely with Ctrl+C with the master foregrounded.
CHLD - Set the worker reap flag. An attempt is made to reap workers
immediately after the current dispatch iteration.
Returns nothing.
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
# File 'lib/divvy/master.rb', line 276 def install_signal_traps @traps = %w[INT QUIT].map do |signal| Signal.trap signal do if @shutdown raise Shutdown, "SIG#{signal}" if (Time.now - @shutdown) > 10 # seconds next else shutdown log "#{signal} received. initiating graceful shutdown..." end end end @traps << Signal.trap("CHLD") { @reap = true } @traps << Signal.trap("TERM") { raise Shutdown, "SIGTERM" } Signal.trap "INFO" do = "==> info: process #$$ dumping stack\n" << caller.join("\n").gsub(/^/, " ").gsub("#{Dir.pwd}/", "") $stderr.puts() end @traps end |
#kill_workers(signal = 'TERM') ⇒ Object
Internal: Send a signal to all running workers.
signal - The string signal name.
Returns nothing.
237 238 239 240 241 242 |
# File 'lib/divvy/master.rb', line 237 def kill_workers(signal = 'TERM') workers.each do |worker| next if !worker.running? worker.kill(signal) end end |
#log(message) ⇒ Object
Internal: Write a verbose log message to stderr.
318 319 320 321 |
# File 'lib/divvy/master.rb', line 318 def log() return if !verbose $stderr.printf("master: %s\n", ) end |
#master_process? ⇒ Boolean
Public: Check if the current process is the master process.
Returns true in the master process, false in the worker process.
135 136 137 |
# File 'lib/divvy/master.rb', line 135 def master_process? @workers end |
#reap_workers ⇒ Object
Internal: Attempt to reap all worker processes via Process::waitpid. This method does not block waiting for processes to exit. Running processes are ignored.
Returns an array of Worker objects whose process’s were reaped. The Worker#status attribute can be used to access the Process::Status result.
250 251 252 253 254 255 256 257 258 |
# File 'lib/divvy/master.rb', line 250 def reap_workers @reap = false workers.select do |worker| if status = worker.reap @failures += 1 if !status.success? worker end end end |
#reset_signal_traps ⇒ Object
Internal: Uninstall signal traps set up by the install_signal_traps method. This is called immediately after forking worker processes to reset traps to their default implementations and also when the master process shuts down.
305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/divvy/master.rb', line 305 def reset_signal_traps return if @traps.nil? || @traps.empty? %w[INT QUIT CHLD TERM].each do |signal| handler = @traps.shift || "DEFAULT" if handler.is_a?(String) Signal.trap(signal, handler) else Signal.trap(signal, &handler) end end end |
#run ⇒ Object
Public: Start the main run loop. This installs signal handlers into the current process, binds to the unix domain socket, boots workers, and begins dispatching work.
The run method does not return until all task items generated have been processed unless a shutdown signal is received or the #shutdown method is called within the loop.
Returns nothing. Raises BootFailure when the workers fail to start. Raises Shutdown when a forceful shutdown is triggered (SIGTERM).
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/divvy/master.rb', line 84 def run fail "Already running!!!" if @server fail "Attempt to run master in a worker process" if worker_process? install_signal_traps start_server @task.dispatch do |*task_item| # boot workers that haven't started yet or have been reaped boot_workers # check for shutdown or worker reap flag until a connection is pending # in the domain socket queue. bail out if workers exited before even # requesting a task item. while IO.select([@server], nil, nil, 0.010).nil? break if @shutdown if @reap reap_workers if !workers_running? && @tasks_distributed == 0 raise BootFailure, "Worker processes failed to boot." else boot_workers end end end break if @shutdown # at this point there should be at least one connection pending. begin data = Marshal.dump(task_item) sock = @server.accept sock.write(data) ensure sock.close if sock end @tasks_distributed += 1 break if @shutdown reap_workers if @reap end nil rescue Shutdown @graceful = false @shutdown = true ensure shutdown! if master_process? end |
#shutdown ⇒ Object
Public: Initiate shutdown of the run loop. The loop will not be stopped when this method returns. The original run loop will return after the current iteration of task item.
156 157 158 |
# File 'lib/divvy/master.rb', line 156 def shutdown @shutdown ||= Time.now end |
#shutdown! ⇒ Object
Internal: Really shutdown the unix socket and reap all worker processes. This doesn’t signal the workers. Instead, the socket shutdown is relied upon to trigger the workers to exit normally.
TODO Send SIGKILL when workers stay running for configurable period.
165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/divvy/master.rb', line 165 def shutdown! fail "Master#shutdown! called in worker process" if worker_process? stop_server while workers_running? kill_workers("KILL") if !@graceful reaped = reap_workers sleep 0.010 if reaped.empty? end reset_signal_traps raise Shutdown if !@graceful end |
#start_server ⇒ Object
Internal: create and bind to the unix domain socket. Note that the requested backlog matches the number of workers. Otherwise workers will get ECONNREFUSED when attempting to connect to the master and exit.
180 181 182 183 184 185 |
# File 'lib/divvy/master.rb', line 180 def start_server fail "Master#start_server called in worker process" if worker_process? File.unlink(@socket) if File.exist?(@socket) @server = UNIXServer.new(@socket) @server.listen(worker_count) end |
#stop_server ⇒ Object
Internal: Close and remove the unix domain socket.
188 189 190 191 192 193 |
# File 'lib/divvy/master.rb', line 188 def stop_server fail "Master#stop_server called in worker process" if worker_process? File.unlink(@socket) if File.exist?(@socket) @server.close if @server @server = nil end |
#worker_process? ⇒ Boolean
Public: Check if the current process is a worker process. This relies on the @workers array being set to a nil value.
Returns true in the worker process, false in master processes.
143 144 145 |
# File 'lib/divvy/master.rb', line 143 def worker_process? !master_process? end |
#workers_running? ⇒ Boolean
Public: Are any worker processes currently running or have yet to be reaped by the master process?
149 150 151 |
# File 'lib/divvy/master.rb', line 149 def workers_running? @workers.any? { |worker| worker.running? } end |