Class: Servolux::Prefork

Inherits:
Object
  • Object
show all
Defined in:
lib/servolux/prefork.rb

Overview

Synopsis

The Prefork class provides a pre-forking worker pool for executing tasks in parallel using multiple processes.

Details

A pre-forking worker pool is a technique for executing code in parallel in a UNIX environment. Each worker in the pool forks a child process and then executes user supplied code in that child process. The child process can pull jobs from a queue (beanstalkd for example) or listen on a socket for network requests.

The code to execute in the child processes is passed as a block to the Prefork initialize method. The child processes executes this code in a loop; that is, your code block should not worry about keeping itself alive. This is handled by the library.

If your code raises an exception, it will be captured by the library code and marshalled back to the parent process. This will halt the child process. The Prefork worker pool does not restart dead workers. A method is provided to iterate over workers that have errors, and it is up to the user to handle errors as they please.

Instead of passing a block to the initialize method, you can provide a Ruby module that defines an “execute” method. This method will be executed in the child process’ run loop. When using a module, you also have the option of defining a “before_executing” method and an “after_executing” method. These methods will be called before the child starts the execute loop and after the execute loop finishes. Each method will be called exactly once. Both methods are optional.

Sending a SIGHUP to a child process will cause that child to stop and restart. The child will send a signal to the parent asking to be shutdown. The parent will gracefully halt the child and then start a new child process to replace it. If you define a “hup” method in your worker module, it will be executed when SIGHUP is received by the child. Your “hup” method will be the last method executed in the signal handler.

This has the advantage of calling your before/after_executing methods again and reloading any code or resources your worker code will use. The SIGHUP will call Thread#wakeup on the main child process thread; please write your code to respond accordingly to this wakeup call (a thread waiting on a Queue#pop will not return when wakeup is called on the thread).

Examples

A pre-forking echo server: github.com/TwP/servolux/blob/master/examples/echo.rb

Pulling jobs from a beanstalkd work queue: github.com/TwP/servolux/blob/master/examples/beanstalk.rb

Before / After Executing

In this example, we are creating 42 worker processes that will log the process ID and the current time to a file. Each worker will do this every 2 seconds. The before/after_executing methods are used to open the file before the run loop starts and to close the file after the run loop completes. The execute method uses the stored file descriptor when logging the message.

module RunMe
  def before_executing
    @fd = File.open("#{Process.pid}.txt", 'w')
  end

  def after_executing
    @fd.close
  end

  def execute
    @fd.puts "Process #{Process.pid} @ #{Time.now}"
    sleep 2
  end
end

pool = Servolux::Prefork.new(:module => RunMe)
pool.start 42

Heartbeat

When a :timeout is supplied to the constructor, a “heartbeat” is setup between the parent and the child worker. Each loop through the child’s execute code must return before :timeout seconds have elapsed. If one iteration through the loop takes longer than :timeout seconds, then the parent process will halt the child worker. An error will be raised in the parent process.

pool = Servolux::Prefork.new(:timeout => 2) {
  puts "Process #{Process.pid} is running."
  sleep(rand * 5)
}
pool.start 42

Eventually all 42 child processes will be killed by their parents. The random number generator will eventually cause the child to sleep longer than two seconds.

What is happening here is that each time the child processes executes the block of code, the Servolux library code will send a “heartbeat” message to the parent. The parent is using a Kernel#select call on the communications pipe to wait for this message. The timeout is passed to the select call, and this will cause it to return nil – this is the error condition the heartbeat prevents.

Use the heartbeat with caution – allow margins for timing issues and processor load spikes.

Signals

Forked child processes are configured to respond to two signals: SIGHUP and SIGTERM. The SIGHUP signal when sent to a child process is used to restart just that one child. The SIGTERM signal when sent to a child process is used to forcibly kill the child; it will not be restarted. The parent process uses SIGTERM to halt all the children when it is stopping.

SIGHUP Child processes are restarted by sending a SIGHUP signal to the child. This will shutdown the child worker and then start up a new one to replace it. For the child to shutdown gracefully, it needs to return from the “execute” method when it receives the signal. Define a “hup” method that will wake the execute thread from any pending operations – listening on a socket, reading a file, polling a queue, etc. When the execute method returns, the child will exit.

SIGTERM Child processes are stopped by the prefork parent by sending a SIGTERM signal to the child. For the child to shutdown gracefully, it needs to return from the “execute” method when it receives the signal. Define a “term” method that will wake the execute thread from any pending operations – listening on a socket, reading a file, polling a queue, etc. When the execute method returns, the child will exit.

Defined Under Namespace

Classes: Worker

Constant Summary collapse

Timeout =
Class.new(::Servolux::Error)
UnknownSignal =
Class.new(::Servolux::Error)
UnknownResponse =
Class.new(::Servolux::Error)
START =

:stopdoc:

"\000START".freeze
HALT =
"\000HALT".freeze
ERROR =
"\000SHIT".freeze
HEARTBEAT =
"\000<3".freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}, &block) ⇒ Prefork

call-seq:

Prefork.new { block }
Prefork.new( :module => Module )

Create a new pre-forking worker pool. You must provide a block of code for the workers to execute in their child processes. This code block can be passed either as a block to this method or as a module via the :module option.

If a :timeout is given, then each worker will setup a “heartbeat” between the parent process and the child process. If the child does not respond to the parent within :timeout seconds, then the child process will be halted. If you do not want to use the heartbeat then leave the :timeout unset or manually set it to nil.

The pre-forking worker pool makes no effort to restart dead workers. It is left to the user to implement this functionality.

Raises:

  • (ArgumentError)


162
163
164
165
166
167
168
169
170
# File 'lib/servolux/prefork.rb', line 162

def initialize( opts = {}, &block )
  @timeout = opts[:timeout]
  @module = opts[:module]
  @module = Module.new { define_method :execute, &block } if block
  @workers = []
  @harvest = Queue.new

  raise ArgumentError, 'No code was given to execute by the workers.' unless @module
end

Instance Attribute Details

#harvestObject (readonly)

List of child PIDs that need to be reaped



142
143
144
# File 'lib/servolux/prefork.rb', line 142

def harvest
  @harvest
end

#timeoutObject

:startdoc:



141
142
143
# File 'lib/servolux/prefork.rb', line 141

def timeout
  @timeout
end

Instance Method Details

#each_worker(&block) ⇒ Object

call-seq:

each_worker { |worker| block }

Iterates over all the works and yields each, in turn, to the given block.



234
235
236
237
# File 'lib/servolux/prefork.rb', line 234

def each_worker( &block )
  @workers.each(&block)
  self
end

#errorsObject

call-seq:

errors { |worker| block }

Iterates over all the works and yields the worker to the given block only if the worker has an error condition.



245
246
247
248
# File 'lib/servolux/prefork.rb', line 245

def errors
  @workers.each { |worker| yield worker unless worker.error.nil? }
  self
end

#reapPrefork

This method should be called periodically in order to clear the return status from child processes that have either died or been restarted (via a HUP signal). This will remove zombie children from the process table.

Returns:



206
207
208
209
210
211
212
# File 'lib/servolux/prefork.rb', line 206

def reap
  while !@harvest.empty?
    pid = @harvest.pop
    Process.wait pid rescue nil
  end
  self
end

#signal(signal = 'TERM') ⇒ Prefork Also known as: kill

Send this given signal to all child process. The default signal is ‘TERM’. The method waits for a short period of time after the signal is sent to each child; this is done to alleviate a flood of signals being sent simultaneously and overwhemlming the CPU.

Parameters:

  • signal (String, Integer) (defaults to: 'TERM')

    The signal to send to child processes.

Returns:



222
223
224
225
# File 'lib/servolux/prefork.rb', line 222

def signal( signal = 'TERM' )
  @workers.each { |worker| worker.signal(signal); pause }
  self
end

#start(number) ⇒ Prefork

Start up the given number of workers. Each worker will create a child process and run the user supplied code in that child process.

Parameters:

  • number (Integer)

    The number of workers to prefork

Returns:



178
179
180
181
182
183
184
185
186
187
# File 'lib/servolux/prefork.rb', line 178

def start( number )
  @workers.clear

  number.times {
    @workers << Worker.new(self)
    @workers.last.extend @module
  }
  @workers.each { |worker| worker.start; pause }
  self
end

#stopObject

Stop all workers. The current process will wait for each child process to exit before this method will return. The worker instances are not destroyed by this method; this means that the each_worker and the errors methods will still function correctly after stopping the workers.



194
195
196
197
198
# File 'lib/servolux/prefork.rb', line 194

def stop
  @workers.each { |worker| worker.stop; pause }
  reap
  self
end