Class: Divvy::Worker
- Inherits:
-
Object
- Object
- Divvy::Worker
- Defined in:
- lib/divvy/worker.rb
Overview
Models an individual divvy worker process. These objects are used in both the master and the forked off workers to perform common tasks and for basic tracking.
Instance Attribute Summary collapse
-
#number ⇒ Object
readonly
The worker number.
-
#pid ⇒ Object
The worker processes’s pid.
-
#socket ⇒ Object
readonly
The Unix domain socket file used to communicate with the master process.
-
#status ⇒ Object
readonly
Process::Status object result of reaping the worker.
-
#verbose ⇒ Object
Whether verbose log info should be written to stderr.
Instance Method Summary collapse
-
#dequeue ⇒ Object
Internal: Retrieve an individual task item from the master process.
-
#initialize(task, number, socket, verbose = false) ⇒ Worker
constructor
Create a Worker object.
- #install_signal_traps ⇒ Object
-
#kill(signal) ⇒ Object
Public: Send a signal to a running worker process.
- #log(message) ⇒ Object
-
#main ⇒ Object
Internal: The main worker loop.
-
#reap ⇒ Object
Public: Attempt to reap this worker’s process using waitpid.
-
#running? ⇒ Boolean
Public: Check whether the worker process is thought to be running.
-
#spawn ⇒ Object
Public: Fork off a new process for this worker and yield to the block immediately in the new child process.
-
#worker_process? ⇒ Boolean
Public: Check whether the current process is this worker process.
Constructor Details
#initialize(task, number, socket, verbose = false) ⇒ Worker
Create a Worker object. The Master object typically handles this.
23 24 25 26 27 28 29 30 |
# File 'lib/divvy/worker.rb', line 23 def initialize(task, number, socket, verbose = false) @task = task @number = number @socket = socket @verbose = verbose @pid = nil @status = nil end |
Instance Attribute Details
#number ⇒ Object (readonly)
The worker number. These are sequential starting from 1 and ending in the configured worker concurrency count.
8 9 10 |
# File 'lib/divvy/worker.rb', line 8 def number @number end |
#pid ⇒ Object
The worker processes’s pid. This is $$ when inside the worker process.
20 21 22 |
# File 'lib/divvy/worker.rb', line 20 def pid @pid end |
#socket ⇒ Object (readonly)
The Unix domain socket file used to communicate with the master process.
11 12 13 |
# File 'lib/divvy/worker.rb', line 11 def socket @socket end |
#status ⇒ Object (readonly)
Process::Status object result of reaping the worker.
17 18 19 |
# File 'lib/divvy/worker.rb', line 17 def status @status end |
#verbose ⇒ Object
Whether verbose log info should be written to stderr.
14 15 16 |
# File 'lib/divvy/worker.rb', line 14 def verbose @verbose end |
Instance Method Details
#dequeue ⇒ Object
Internal: Retrieve an individual task item from the master process. Opens a new socket, reads and unmarshals a single task item.
Returns an Array containing the arguments yielded by the dispatcher.
122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/divvy/worker.rb', line 122 def dequeue client = UNIXSocket.new(@socket) r, w, e = IO.select([client], nil, [client], nil) return if !e.empty? if data = client.read(16384) Marshal.load(data) end rescue Errno::ENOENT => boom # socket file went away, bail out ensure client.close if client end |
#install_signal_traps ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/divvy/worker.rb', line 136 def install_signal_traps fail "attempt to install worker signal handling in master" if !worker_process? %w[INT TERM QUIT].each do |signal| trap signal do next if @shutdown @shutdown = true log "#{signal} received. initiating graceful shutdown..." end end end |
#kill(signal) ⇒ Object
Public: Send a signal to a running worker process.
signal - String signal name.
Returns true when the process was signaled, false if the process is no longer running. Raises when the worker process is not thought to be running.
45 46 47 48 49 50 51 52 |
# File 'lib/divvy/worker.rb', line 45 def kill(signal) fail "worker not running" if @pid.nil? log "sending signal #{signal}" Process.kill(signal, @pid) true rescue Errno::ESRCH false end |
#log(message) ⇒ Object
148 149 150 151 |
# File 'lib/divvy/worker.rb', line 148 def log() return if !verbose $stderr.printf("worker [%d]: %s\n", number, ) end |
#main ⇒ Object
Internal: The main worker loop. This is called after a new worker process has been setup with signal traps and whatnot and connects to the master in a loop to retrieve task items. The worker process exits if this method returns or raises an exception.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/divvy/worker.rb', line 101 def main fail "Worker#main called in master process" if !worker_process? log "booted with pid #{@pid}" @task.after_fork(self) while arguments = dequeue @task.process(*arguments) break if @shutdown end # worker should exit on return rescue Exception => boom warn "error: worker [#{number}]: #{boom.class} #{boom.to_s}" exit 1 end |
#reap ⇒ Object
Public: Attempt to reap this worker’s process using waitpid. This is a no-op if the process is not thought to be running or is marked as already being reaped. This should only be called in the master process.
Returns the Process::Status object if the process was reaped, nil if the process was not reaped because it’s still running or is already reaped.
89 90 91 92 93 94 95 |
# File 'lib/divvy/worker.rb', line 89 def reap if @status.nil? && @pid && Process::waitpid(@pid, Process::WNOHANG) @status = $? log "exited, reaped pid #{@pid} (status: #{@status.exitstatus})" @status end end |
#running? ⇒ Boolean
Public: Check whether the worker process is thought to be running. This does not attempt to verify the real state of the process with the system.
34 35 36 |
# File 'lib/divvy/worker.rb', line 34 def running? @pid && @status.nil? end |
#spawn ⇒ Object
Public: Fork off a new process for this worker and yield to the block immediately in the new child process.
Returns the pid of the new process in the master process. Never returns in the child process. Raises when the worker process is already thought to be running or has not yet been reaped.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/divvy/worker.rb', line 68 def spawn fail "worker already running" if running? @status = nil if (@pid = fork).nil? @pid = $$ yield install_signal_traps main exit 0 end @pid end |
#worker_process? ⇒ Boolean
Public: Check whether the current process is this worker process.
Returns true when we’re in this worker, false in the master.
57 58 59 |
# File 'lib/divvy/worker.rb', line 57 def worker_process? @pid == $$ end |