Class: Procrastinate::ProcessManager
- Inherits:
-
Object
- Object
- Procrastinate::ProcessManager
- Defined in:
- lib/procrastinate/process_manager.rb
Overview
Dispatches and handles tasks and task completion. Only low level unixy manipulation here, no strategy. The only methods you should call from the outside are #setup, #step, #wakeup and #shutdown.
Instance Attribute Summary collapse
-
#children ⇒ Object
readonly
A hash of <pid, callback> that contains callbacks for all the child processes we spawn.
-
#control_pipe ⇒ Object
readonly
This pipe is used to wait for events in the master process.
Instance Method Summary collapse
-
#cleanup ⇒ Object
Gets executed in child process to clean up file handles and pipes that the master holds.
-
#create_process(task, &completion_handler) ⇒ void
Spawns a process to work on
task
. - #finalize_children ⇒ Object
-
#initialize ⇒ ProcessManager
constructor
A new instance of ProcessManager.
-
#kill_processes(signal = 'QUIT') ⇒ Object
Kills all running processes by sending them a QUIT signal.
-
#process_count ⇒ Object
Returns the number of child processes that are alive at this point.
-
#reap_childs ⇒ Object
Calls completion handlers for all the childs that have now exited.
-
#register_signals ⇒ Object
Register signals that aid in child care.
-
#setup ⇒ Object
Sets up resource usage for dispatcher.
-
#step ⇒ Object
Performs one step in the dispatchers work.
-
#teardown ⇒ Object
Tears down the dispatcher.
-
#unregister_signals ⇒ Object
Unregister signals.
-
#wait_for_all_childs ⇒ Object
Waits for all childs to complete.
-
#wait_for_event ⇒ Object
Called from the child management thread, will put that thread to sleep until someone requests it to become active again.
-
#wakeup ⇒ Object
Wake up the dispatcher thread.
Constructor Details
#initialize ⇒ ProcessManager
Returns a new instance of ProcessManager.
19 20 21 22 23 24 25 |
# File 'lib/procrastinate/process_manager.rb', line 19 def initialize # This controls process manager wakeup @control_pipe = IO.pipe # All presently running children @children = {} end |
Instance Attribute Details
#children ⇒ Object (readonly)
A hash of <pid, callback> that contains callbacks for all the child processes we spawn. Once the process is complete, the callback is called in the procrastinate thread.
17 18 19 |
# File 'lib/procrastinate/process_manager.rb', line 17 def children @children end |
#control_pipe ⇒ Object (readonly)
This pipe is used to wait for events in the master process.
12 13 14 |
# File 'lib/procrastinate/process_manager.rb', line 12 def control_pipe @control_pipe end |
Instance Method Details
#cleanup ⇒ Object
Gets executed in child process to clean up file handles and pipes that the master holds.
198 199 200 201 202 203 204 |
# File 'lib/procrastinate/process_manager.rb', line 198 def cleanup # Children dont need the parents signal handler unregister_signals # The child doesn't need the control pipe for now. control_pipe.each { |io| io.close } end |
#create_process(task, &completion_handler) ⇒ void
This method returns an undefined value.
Spawns a process to work on task
. If a block is given, it is called when the task completes. This method should only be called from a strategy inside the dispatchers thread. Otherwise it will expose threading issues.
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/procrastinate/process_manager.rb', line 166 def create_process(task, &completion_handler) # Tasks that are interested in getting messages from their childs must # provide a result object that handles incoming 'result' messages. result = task.result child_process = ChildProcess.new(completion_handler, result) pid = fork do cleanup if result endpoint = lambda { |obj| child_process.(obj) } task.run(endpoint) else task.run(nil) end exit! # this seems to be needed to avoid rspecs cleanup tasks end # This should never fire: New children are spawned only after we loose # track of the old ones because they have been successfully processed. fail "PID REUSE!" if children.has_key?(pid) # The spawning is done in the same thread as the reaping is done. This is # why no race condition to the following line exists. (or in other code, # for that matter.) children[pid] = child_process.tap { |s| s.start } end |
#finalize_children ⇒ Object
124 125 126 127 128 129 130 131 |
# File 'lib/procrastinate/process_manager.rb', line 124 def finalize_children children. select { |pid, child| child.stopped? }. each { |pid, child| child.finalize } children.delete_if { |pid, child| child.removable? } end |
#kill_processes(signal = 'QUIT') ⇒ Object
Kills all running processes by sending them a QUIT signal.
221 222 223 224 225 |
# File 'lib/procrastinate/process_manager.rb', line 221 def kill_processes(signal='QUIT') children.each do |pid, process| Process.kill(signal, pid) end end |
#process_count ⇒ Object
Returns the number of child processes that are alive at this point. Note that even if a child process is marked dead internally, it counts towards this number, since its results may not have been dispatched yet.
65 66 67 |
# File 'lib/procrastinate/process_manager.rb', line 65 def process_count children.count end |
#reap_childs ⇒ Object
Calls completion handlers for all the childs that have now exited.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/procrastinate/process_manager.rb', line 135 def reap_childs loop do child_pid, status = Process.waitpid(-1, Process::WNOHANG) break unless child_pid # Trigger the completion callback if child=children[child_pid] child.sigchld_received # Maybe there are messages queued for this child. If nothing is queued # up, the thread will hang in the select in #wait_for_event unless # we wake it up. wakeup end end rescue Errno::ECHILD # Ignore: This means that no childs remain. end |
#register_signals ⇒ Object
Register signals that aid in child care. NB: Because we do this globally, holding more than one dispatcher in a process will not work yet.
74 75 76 |
# File 'lib/procrastinate/process_manager.rb', line 74 def register_signals trap('CHLD') { wakeup } end |
#setup ⇒ Object
Sets up resource usage for dispatcher. You must call this before dispatcher can start its work.
30 31 32 |
# File 'lib/procrastinate/process_manager.rb', line 30 def setup register_signals end |
#step ⇒ Object
Performs one step in the dispatchers work. This will sleep and wait for work to be done, then wake up and reap processes that are still pending. This method will mostly sleep.
38 39 40 41 42 43 |
# File 'lib/procrastinate/process_manager.rb', line 38 def step # Sleep until either work arrives or we receive a SIGCHLD wait_for_event # Reap all processes that have terminated in the meantime. reap_childs end |
#teardown ⇒ Object
Tears down the dispatcher. This frees resources that have been allocated and waits for all children to terminate.
48 49 50 51 |
# File 'lib/procrastinate/process_manager.rb', line 48 def teardown wait_for_all_childs unregister_signals end |
#unregister_signals ⇒ Object
Unregister signals. Process should be as before.
80 81 82 |
# File 'lib/procrastinate/process_manager.rb', line 80 def unregister_signals trap('CHLD', 'DEFAULT') end |
#wait_for_all_childs ⇒ Object
Waits for all childs to complete.
208 209 210 211 212 213 214 215 |
# File 'lib/procrastinate/process_manager.rb', line 208 def wait_for_all_childs # TODO Maybe signal KILL to children after some time. until children.empty? wait_for_event reap_childs finalize_children end end |
#wait_for_event ⇒ Object
Called from the child management thread, will put that thread to sleep until someone requests it to become active again. See #wakeup.
This method also depletes the child queue, reading end of processing messages from all childs and dispatching them to the children.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/procrastinate/process_manager.rb', line 90 def wait_for_event cp_read_end = control_pipe.first loop do # until we have input in the cp_read_end (control_pipe) io_map = children.inject({}) { |map, (_, child)| map[child.master_pipe] = child; map } ready, _, _ = IO.select(io_map.keys + [cp_read_end], [], [], 0.1) next unless ready # Process all messages that were sent from our childs to us. ready.each { |io| next if io == cp_read_end child = io_map[io] fail "Assert: All IOs correspond to a child" unless child child. } # Send the tracking code for the child processes the final notifications # and remove them from the children hash. At this point we know that # no messages are waiting in the child queue. finalize_children if ready.include?(cp_read_end) # Consume the data (not important) cp_read_end.read_nonblock(1024) # And return to our caller. This is the event we've been waiting for. return end end end |
#wakeup ⇒ Object
Wake up the dispatcher thread.
55 56 57 58 59 |
# File 'lib/procrastinate/process_manager.rb', line 55 def wakeup control_pipe.last.write '.' # rescue IOError # Ignore: end |