MultiProcessing
MultiProcessing provides classes for inter-process synchronization and communication in Ruby.
These classes can be used like ones in Ruby standard library for multi threading.
To realize communicating across processes, MultiProcessing uses pipes (IO.pipe). You have to use #fork to create process which accesses synchonizing/communication object. For this reason, MultiProcessing can be used only on Unix or Linux.
Install
gem install multiprocessing
Provided Classes
- Mutex
- ConditionVariable
- Semaphore
- Queue
- (ExternalObject: under development)
In addition, IO.name_pipe is added.
Mutex
Process version of Mutex. It can be used like ::Mutex in Ruby standard library.
Example:
require 'multiprocessing'
mutex = MultiProcessing::Mutex.new
3.times do
fork do
mutex.synchronize do
# critical section
puts Process.pid
sleep 1
end
end
end
Process.waitall
# => prints 3 pids of forked process in 1 sec interval
Note: Do not fork in critical section.
ConditionVariable
Process version of ConditionVariable. It can be used like ::ConditionVariable in Ruby standard library.
Example:
require 'multiprocessing'
m = MultiProcessing::Mutex.new
cond = MultiProcessing::ConditionVariable.new
3.times do
fork do
m.synchronize do
puts "waiting pid:#{Process.pid}"
cond.wait(m)
puts "restarted pid:#{Process.pid}"
end
end
end
sleep 0.1 # => 3 processes get waiting a signal
cond.signal # => One process restarts
cond.broadcast # => Remaining 2 process restart
Process.waitall
Semaphore
Semaphore is like Mutex but it can manage multiple resources. It is initialized with initiali number of resources. It can be release from the thread or process which didn't lock the semaphore.
Example:
require 'multiprocessing'
s = MultiProcessing::Semaphore.new 2
3.times do
fork do
s.synchronize do
puts "pid: #{Process.pid}"
sleep 1
end
end
end
Process.waitall
# => two processes prints its pid immediately
# but the other does late.
Queue
Process version of Queue. It provides away to communication between processes. It can be used like ::Queue in Ruby standard library.
Queue usees pipes to communicate with other processes. Queue#push} starts background thread ti write data to the pipe. Avoiding to exit process before writing to the pipe, use Queue#close and Queue#join_thread. Queue#join_thread waits until all data is written to the pipe.
Example:
require 'multiprocessing'
q = MultiProcessing::Queue.new
fork do
q.push :nyan
q.push :wan
q.close.join_thread
end
q.pop # => :nyan
q.pop # => :wan