Class: Procrastinate::ProcessManager

Inherits:
Object
  • Object
show all
Defined in:
lib/procrastinate/process_manager.rb

Overview

Dispatches and handles tasks and task completion. Only low level unixy manipulation here, no strategy. The only methods you should call from the outside are #setup, #step, #wakeup and #shutdown.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeProcessManager

Returns a new instance of ProcessManager.



19
20
21
22
23
24
25
# File 'lib/procrastinate/process_manager.rb', line 19

def initialize
  # This controls process manager wakeup
  @control_pipe = IO.pipe
  
  # All presently running children
  @children = {}
end

Instance Attribute Details

#childrenObject (readonly)

A hash of <pid, callback> that contains callbacks for all the child processes we spawn. Once the process is complete, the callback is called in the procrastinate thread.



17
18
19
# File 'lib/procrastinate/process_manager.rb', line 17

def children
  @children
end

#control_pipeObject (readonly)

This pipe is used to wait for events in the master process.



12
13
14
# File 'lib/procrastinate/process_manager.rb', line 12

def control_pipe
  @control_pipe
end

Instance Method Details

#cleanupObject

Gets executed in child process to clean up file handles and pipes that the master holds.



198
199
200
201
202
203
204
# File 'lib/procrastinate/process_manager.rb', line 198

def cleanup
  # Children dont need the parents signal handler
  unregister_signals
  
  # The child doesn't need the control pipe for now.
  control_pipe.each { |io| io.close }
end

#create_process(task, &completion_handler) ⇒ void

This method returns an undefined value.

Spawns a process to work on task. If a block is given, it is called when the task completes. This method should only be called from a strategy inside the dispatchers thread. Otherwise it will expose threading issues.

Examples:

create_process(wi) { puts "Task is complete" }

Parameters:

  • task (Procrastinate::Task::Callable)

    task to be run inside the forked process

  • completion_handler (Proc)

    completion handler that is called when the process exits



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/procrastinate/process_manager.rb', line 166

def create_process(task, &completion_handler)
  # Tasks that are interested in getting messages from their childs must 
  # provide a result object that handles incoming 'result' messages.
  result = task.result
  child_process = ChildProcess.new(completion_handler, result)
  
  pid = fork do
    cleanup
          
    if result
      endpoint = lambda { |obj| child_process.send_message(obj) }
      task.run(endpoint)
    else
      task.run(nil)
    end

    exit! # this seems to be needed to avoid rspecs cleanup tasks
  end
      
  # This should never fire: New children are spawned only after we loose 
  # track of the old ones because they have been successfully processed.
  fail "PID REUSE!" if children.has_key?(pid)
  
  # The spawning is done in the same thread as the reaping is done. This is 
  # why no race condition to the following line exists. (or in other code, 
  # for that matter.)
  children[pid] = child_process.tap { |s| s.start }
end

#finalize_childrenObject



124
125
126
127
128
129
130
131
# File 'lib/procrastinate/process_manager.rb', line 124

def finalize_children
  children.
    select { |pid, child| child.stopped? }.
    each { |pid, child| child.finalize }

  children.delete_if { |pid, child| 
    child.removable? }
end

#kill_processes(signal = 'QUIT') ⇒ Object

Kills all running processes by sending them a QUIT signal.

Parameters:

  • signal (String) (defaults to: 'QUIT')

    signal to send to the forked processes.



221
222
223
224
225
# File 'lib/procrastinate/process_manager.rb', line 221

def kill_processes(signal='QUIT')
  children.each do |pid, process|
    Process.kill(signal, pid)
  end
end

#process_countObject

Returns the number of child processes that are alive at this point. Note that even if a child process is marked dead internally, it counts towards this number, since its results may not have been dispatched yet.



65
66
67
# File 'lib/procrastinate/process_manager.rb', line 65

def process_count
  children.count
end

#reap_childsObject

Calls completion handlers for all the childs that have now exited.



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/procrastinate/process_manager.rb', line 135

def reap_childs
  loop do
    child_pid, status = Process.waitpid(-1, Process::WNOHANG)
    break unless child_pid

    # Trigger the completion callback
    if child=children[child_pid]
      child.sigchld_received 
      # Maybe there are messages queued for this child. If nothing is queued
      # up, the thread will hang in the select in #wait_for_event unless
      # we wake it up. 
      wakeup
    end
  end
rescue Errno::ECHILD
  # Ignore: This means that no childs remain. 
end

#register_signalsObject

Register signals that aid in child care. NB: Because we do this globally, holding more than one dispatcher in a process will not work yet.



74
75
76
# File 'lib/procrastinate/process_manager.rb', line 74

def register_signals
  trap('CHLD') { wakeup }
end

#setupObject

Sets up resource usage for dispatcher. You must call this before dispatcher can start its work.



30
31
32
# File 'lib/procrastinate/process_manager.rb', line 30

def setup
  register_signals
end

#stepObject

Performs one step in the dispatchers work. This will sleep and wait for work to be done, then wake up and reap processes that are still pending. This method will mostly sleep.



38
39
40
41
42
43
# File 'lib/procrastinate/process_manager.rb', line 38

def step
  # Sleep until either work arrives or we receive a SIGCHLD
  wait_for_event
  # Reap all processes that have terminated in the meantime.
  reap_childs
end

#teardownObject

Tears down the dispatcher. This frees resources that have been allocated and waits for all children to terminate.



48
49
50
51
# File 'lib/procrastinate/process_manager.rb', line 48

def teardown
  wait_for_all_childs
  unregister_signals
end

#unregister_signalsObject

Unregister signals. Process should be as before.



80
81
82
# File 'lib/procrastinate/process_manager.rb', line 80

def unregister_signals
  trap('CHLD', 'DEFAULT')
end

#wait_for_all_childsObject

Waits for all childs to complete.



208
209
210
211
212
213
214
215
# File 'lib/procrastinate/process_manager.rb', line 208

def wait_for_all_childs
  # TODO Maybe signal KILL to children after some time. 
  until children.empty?
    wait_for_event
    reap_childs
    finalize_children
  end
end

#wait_for_eventObject

Called from the child management thread, will put that thread to sleep until someone requests it to become active again. See #wakeup.

This method also depletes the child queue, reading end of processing messages from all childs and dispatching them to the children.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/procrastinate/process_manager.rb', line 90

def wait_for_event
  cp_read_end = control_pipe.first
  
  loop do # until we have input in the cp_read_end (control_pipe)
    io_map = children.inject({}) { |map, (_, child)|
      map[child.master_pipe] = child; map }

    ready, _, _ = IO.select(io_map.keys + [cp_read_end], [], [], 0.1)
    next unless ready

    # Process all messages that were sent from our childs to us. 
    ready.each { |io| 
      next if io == cp_read_end

      child = io_map[io]

      fail "Assert: All IOs correspond to a child" unless child
      child.read_message 
    }
         
    # Send the tracking code for the child processes the final notifications
    # and remove them from the children hash. At this point we know that
    # no messages are waiting in the child queue.
    finalize_children
    
    if ready.include?(cp_read_end)
      # Consume the data (not important)
      cp_read_end.read_nonblock(1024)
      # And return to our caller. This is the event we've been waiting for. 
      return
    end
  end
end

#wakeupObject

Wake up the dispatcher thread.



55
56
57
58
59
# File 'lib/procrastinate/process_manager.rb', line 55

def wakeup
  control_pipe.last.write '.'
# rescue IOError
  # Ignore:
end