Class: Procrastinate::ProcessManager
- Inherits:
-
Object
- Object
- Procrastinate::ProcessManager
- Includes:
- IPC
- Defined in:
- lib/procrastinate/process_manager.rb
Overview
Dispatches and handles tasks and task completion. Only low level unixy manipulation here, no strategy. The only methods you should call from the outside are #setup, #step, #wakeup and #shutdown.
Defined Under Namespace
Classes: ChildProcess, ObjectEndpoint
Instance Attribute Summary collapse
-
#children ⇒ Object
readonly
A hash of <pid, callback> that contains callbacks for all the child processes we spawn.
-
#control_pipe ⇒ Object
readonly
This pipe is used to wait for events in the master process.
Instance Method Summary collapse
-
#cleanup ⇒ Object
Gets executed in child process to clean up file handles and pipes that the master holds.
-
#create_process(task, &completion_handler) ⇒ Object
Spawns a process to work on
task
. -
#decode_and_handle_message(msg) ⇒ Object
Called for every message sent from a child.
-
#initialize ⇒ ProcessManager
constructor
A new instance of ProcessManager.
- #kill_children ⇒ Object
-
#process_count ⇒ Object
Returns the number of child processes that are alive at this point.
-
#read_child_messages ⇒ Object
Once the @cmc_server endpoint is ready, loops and reads all child communication.
-
#reap_childs ⇒ Object
Calls completion handlers for all the childs that have now exited.
-
#register_signals ⇒ Object
Register signals that aid in child care.
-
#setup ⇒ Object
Sets up resource usage for dispatcher.
-
#step ⇒ Object
Performs one step in the dispatchers work.
-
#teardown ⇒ Object
Tears down the dispatcher.
-
#unregister_signals ⇒ Object
Unregister signals.
-
#wait_for_all_childs ⇒ Object
Waits for all childs to complete.
-
#wait_for_event ⇒ Object
Called from the child management thread, will put that thread to sleep until someone requests it to become active again.
-
#wakeup ⇒ Object
Wake up the dispatcher thread.
Constructor Details
#initialize ⇒ ProcessManager
Returns a new instance of ProcessManager.
22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/procrastinate/process_manager.rb', line 22 def initialize # This controls process manager wakeup @control_pipe = IO.pipe # All presently running children @children = {} # Child Master Communication (cmc) endpoint = Endpoint.anonymous @cmc_server = endpoint.server @cmc_client = endpoint.client end |
Instance Attribute Details
#children ⇒ Object (readonly)
A hash of <pid, callback> that contains callbacks for all the child processes we spawn. Once the process is complete, the callback is called in the procrastinate thread.
20 21 22 |
# File 'lib/procrastinate/process_manager.rb', line 20 def children @children end |
#control_pipe ⇒ Object (readonly)
This pipe is used to wait for events in the master process.
15 16 17 |
# File 'lib/procrastinate/process_manager.rb', line 15 def control_pipe @control_pipe end |
Instance Method Details
#cleanup ⇒ Object
Gets executed in child process to clean up file handles and pipes that the master holds.
203 204 205 206 207 208 209 |
# File 'lib/procrastinate/process_manager.rb', line 203 def cleanup # Children dont need the parents signal handler unregister_signals # The child doesn't need the control pipe for now. control_pipe.each { |io| io.close } end |
#create_process(task, &completion_handler) ⇒ Object
Spawns a process to work on task
. If a block is given, it is called when the task completes. This method should only be called from a strategy inside the dispatchers thread. Otherwise it will expose threading issues.
Example:
spawn(wi) { |pid| puts "Task is complete" }
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/procrastinate/process_manager.rb', line 176 def create_process(task, &completion_handler) # Tasks that are interested in getting messages from their childs must # provide a result object that handles incoming 'result' messages. result = task.result pid = fork do cleanup if result endpoint = ObjectEndpoint.new(@cmc_client, Process.pid) task.run(endpoint) else task.run(nil) end exit! # this seems to be needed to avoid rspecs cleanup tasks end # The spawning is done in the same thread as the reaping is done. This is # why no race condition to the following line exists. (or in other code, # for that matter.) children[pid] = ChildProcess.new(completion_handler, result).tap { |s| s.start } end |
#decode_and_handle_message(msg) ⇒ Object
Called for every message sent from a child. The msg
param here is a string that still needs decoding.
140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/procrastinate/process_manager.rb', line 140 def (msg) pid, obj = Marshal.load(msg) if child=children[pid] child.(obj) else warn "Communication from child #{pid} received, but child is gone." end rescue => b # Messages that cannot be unmarshalled will be ignored. warn "Can't unmarshal child communication." end |
#kill_children ⇒ Object
122 123 124 |
# File 'lib/procrastinate/process_manager.rb', line 122 def kill_children children.delete_if { |pid, child| child.dead? } end |
#process_count ⇒ Object
Returns the number of child processes that are alive at this point. Note that even if a child process is marked dead internally, it counts towards this number, since its results may not have been dispatched yet.
73 74 75 |
# File 'lib/procrastinate/process_manager.rb', line 73 def process_count children.count end |
#read_child_messages ⇒ Object
Once the @cmc_server endpoint is ready, loops and reads all child communication.
128 129 130 131 132 133 134 135 |
# File 'lib/procrastinate/process_manager.rb', line 128 def loop do msg = @cmc_server.receive (msg) break unless @cmc_server.waiting? end end |
#reap_childs ⇒ Object
Calls completion handlers for all the childs that have now exited.
154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/procrastinate/process_manager.rb', line 154 def reap_childs loop do child_pid, status = Process.waitpid(-1, Process::WNOHANG) break unless child_pid # Trigger the completion callback if child=children[child_pid] child.died end end rescue Errno::ECHILD # Ignore: This means that no childs remain. end |
#register_signals ⇒ Object
Register signals that aid in child care. NB: Because we do this globally, holding more than one dispatcher in a process will not work yet.
82 83 84 |
# File 'lib/procrastinate/process_manager.rb', line 82 def register_signals trap('CHLD') { wakeup } end |
#setup ⇒ Object
Sets up resource usage for dispatcher. You must call this before dispatcher can start its work.
38 39 40 |
# File 'lib/procrastinate/process_manager.rb', line 38 def setup register_signals end |
#step ⇒ Object
Performs one step in the dispatchers work. This will sleep and wait for work to be done, then wake up and reap processes that are still pending. This method will mostly sleep.
46 47 48 49 50 51 |
# File 'lib/procrastinate/process_manager.rb', line 46 def step # Sleep until either work arrives or we receive a SIGCHLD wait_for_event # Reap all processes that have terminated in the meantime. reap_childs end |
#teardown ⇒ Object
Tears down the dispatcher. This frees resources that have been allocated and waits for all children to terminate.
56 57 58 59 |
# File 'lib/procrastinate/process_manager.rb', line 56 def teardown wait_for_all_childs unregister_signals end |
#unregister_signals ⇒ Object
Unregister signals. Process should be as before.
88 89 90 |
# File 'lib/procrastinate/process_manager.rb', line 88 def unregister_signals trap('CHLD', 'DEFAULT') end |
#wait_for_all_childs ⇒ Object
Waits for all childs to complete.
213 214 215 216 217 218 219 |
# File 'lib/procrastinate/process_manager.rb', line 213 def wait_for_all_childs # TODO Maybe signal KILL to children after some time. until children.all? { |p, c| c.dead? } wait_for_event reap_childs end end |
#wait_for_event ⇒ Object
Called from the child management thread, will put that thread to sleep until someone requests it to become active again. See #wakeup.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/procrastinate/process_manager.rb', line 95 def wait_for_event cp_read_end = control_pipe.first loop do # until we have input in the cp_read_end (control_pipe) ready = Endpoint.select([cp_read_end, @cmc_server]) if ready.include? @cmc_server # Kill children here, since we've just depleted the communication # endpoint. This avoids the situation where the child process # communicates but we remove it from our records before it can be told # about it. kill_children if ready.include? cp_read_end # Consume the data (not important) cp_read_end.read_nonblock(1024) return end end # rescue Errno::EAGAIN, Errno::EINTR # TODO Is this needed? # A signal has been received. Mostly, this is as if we had received # something in the control pipe. end |
#wakeup ⇒ Object
Wake up the dispatcher thread.
63 64 65 66 67 |
# File 'lib/procrastinate/process_manager.rb', line 63 def wakeup control_pipe.last.write '.' # rescue IOError # Ignore: end |