Class: Procrastinate::ProcessManager

Inherits:
Object
  • Object
show all
Includes:
IPC
Defined in:
lib/procrastinate/process_manager.rb

Overview

Dispatches and handles tasks and task completion. Only low level unixy manipulation here, no strategy. The only methods you should call from the outside are #setup, #step, #wakeup and #shutdown.

Defined Under Namespace

Classes: ChildProcess, ObjectEndpoint

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeProcessManager

Returns a new instance of ProcessManager.



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/procrastinate/process_manager.rb', line 22

def initialize
  # This controls process manager wakeup
  @control_pipe = IO.pipe
  
  # All presently running children
  @children = {}
  
  # Child Master Communication (cmc)
  endpoint = Endpoint.anonymous
  @cmc_server = endpoint.server
  @cmc_client = endpoint.client
end

Instance Attribute Details

#childrenObject (readonly)

A hash of <pid, callback> that contains callbacks for all the child processes we spawn. Once the process is complete, the callback is called in the procrastinate thread.



20
21
22
# File 'lib/procrastinate/process_manager.rb', line 20

def children
  @children
end

#control_pipeObject (readonly)

This pipe is used to wait for events in the master process.



15
16
17
# File 'lib/procrastinate/process_manager.rb', line 15

def control_pipe
  @control_pipe
end

Instance Method Details

#cleanupObject

Gets executed in child process to clean up file handles and pipes that the master holds.



203
204
205
206
207
208
209
# File 'lib/procrastinate/process_manager.rb', line 203

def cleanup
  # Children dont need the parents signal handler
  unregister_signals
  
  # The child doesn't need the control pipe for now.
  control_pipe.each { |io| io.close }
end

#create_process(task, &completion_handler) ⇒ Object

Spawns a process to work on task. If a block is given, it is called when the task completes. This method should only be called from a strategy inside the dispatchers thread. Otherwise it will expose threading issues.

Example:

spawn(wi) { |pid| puts "Task is complete" }


176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/procrastinate/process_manager.rb', line 176

def create_process(task, &completion_handler)
  # Tasks that are interested in getting messages from their childs must 
  # provide a result object that handles incoming 'result' messages.
  result = task.result
  
  pid = fork do
    cleanup
          
    if result
      endpoint = ObjectEndpoint.new(@cmc_client, Process.pid)
      task.run(endpoint)
    else
      task.run(nil)
    end

    exit! # this seems to be needed to avoid rspecs cleanup tasks
  end
  
  # The spawning is done in the same thread as the reaping is done. This is 
  # why no race condition to the following line exists. (or in other code, 
  # for that matter.)
  children[pid] = ChildProcess.new(completion_handler, result).tap { |s| s.start }
end

#decode_and_handle_message(msg) ⇒ Object

Called for every message sent from a child. The msg param here is a string that still needs decoding.



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/procrastinate/process_manager.rb', line 140

def decode_and_handle_message(msg)
  pid, obj = Marshal.load(msg)
  if child=children[pid]
    child.incoming_message(obj)
  else
    warn "Communication from child #{pid} received, but child is gone."
  end
rescue => b
  # Messages that cannot be unmarshalled will be ignored. 
  warn "Can't unmarshal child communication."
end

#kill_childrenObject



122
123
124
# File 'lib/procrastinate/process_manager.rb', line 122

def kill_children
  children.delete_if { |pid, child| child.dead? }
end

#process_countObject

Returns the number of child processes that are alive at this point. Note that even if a child process is marked dead internally, it counts towards this number, since its results may not have been dispatched yet.



73
74
75
# File 'lib/procrastinate/process_manager.rb', line 73

def process_count
  children.count
end

#read_child_messagesObject

Once the @cmc_server endpoint is ready, loops and reads all child communication.



128
129
130
131
132
133
134
135
# File 'lib/procrastinate/process_manager.rb', line 128

def read_child_messages
  loop do
    msg = @cmc_server.receive
    decode_and_handle_message(msg)
    
    break unless @cmc_server.waiting?
  end
end

#reap_childsObject

Calls completion handlers for all the childs that have now exited.



154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/procrastinate/process_manager.rb', line 154

def reap_childs
  loop do
    child_pid, status = Process.waitpid(-1, Process::WNOHANG)
    break unless child_pid

    # Trigger the completion callback
    if child=children[child_pid]
      child.died 
    end
  end
rescue Errno::ECHILD
  # Ignore: This means that no childs remain. 
end

#register_signalsObject

Register signals that aid in child care. NB: Because we do this globally, holding more than one dispatcher in a process will not work yet.



82
83
84
# File 'lib/procrastinate/process_manager.rb', line 82

def register_signals
  trap('CHLD') { wakeup }
end

#setupObject

Sets up resource usage for dispatcher. You must call this before dispatcher can start its work.



38
39
40
# File 'lib/procrastinate/process_manager.rb', line 38

def setup
  register_signals
end

#stepObject

Performs one step in the dispatchers work. This will sleep and wait for work to be done, then wake up and reap processes that are still pending. This method will mostly sleep.



46
47
48
49
50
51
# File 'lib/procrastinate/process_manager.rb', line 46

def step
  # Sleep until either work arrives or we receive a SIGCHLD
  wait_for_event
  # Reap all processes that have terminated in the meantime.
  reap_childs
end

#teardownObject

Tears down the dispatcher. This frees resources that have been allocated and waits for all children to terminate.



56
57
58
59
# File 'lib/procrastinate/process_manager.rb', line 56

def teardown
  wait_for_all_childs
  unregister_signals
end

#unregister_signalsObject

Unregister signals. Process should be as before.



88
89
90
# File 'lib/procrastinate/process_manager.rb', line 88

def unregister_signals
  trap('CHLD', 'DEFAULT')
end

#wait_for_all_childsObject

Waits for all childs to complete.



213
214
215
216
217
218
219
# File 'lib/procrastinate/process_manager.rb', line 213

def wait_for_all_childs
  # TODO Maybe signal KILL to children after some time. 
  until children.all? { |p, c| c.dead? }
    wait_for_event
    reap_childs
  end
end

#wait_for_eventObject

Called from the child management thread, will put that thread to sleep until someone requests it to become active again. See #wakeup.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/procrastinate/process_manager.rb', line 95

def wait_for_event
  cp_read_end = control_pipe.first
  
  loop do # until we have input in the cp_read_end (control_pipe)
    ready = Endpoint.select([cp_read_end, @cmc_server])
    
    read_child_messages if ready.include? @cmc_server

    # Kill children here, since we've just depleted the communication
    # endpoint. This avoids the situation where the child process
    # communicates but we remove it from our records before it can be told
    # about it.
    kill_children
    
    if ready.include? cp_read_end
      # Consume the data (not important)
      cp_read_end.read_nonblock(1024)
      return
    end
  end

# rescue Errno::EAGAIN, Errno::EINTR
  # TODO Is this needed?
  # A signal has been received. Mostly, this is as if we had received
  # something in the control pipe.
end

#wakeupObject

Wake up the dispatcher thread.



63
64
65
66
67
# File 'lib/procrastinate/process_manager.rb', line 63

def wakeup
  control_pipe.last.write '.'
# rescue IOError
  # Ignore:
end