Class: MultiProcessing::ConditionVariable

Inherits:
Object
  • Object
show all
Defined in:
lib/multiprocessing/conditionvariable.rb

Overview

Process version of ConditionVariable This can be used like ::ConditionVariable in Ruby standard library.

Note that ConditionVariable uses 2 pipes.

Examples:

require 'multiprocessing'

m = MultiProcessing::Mutex.new
cond = MultiProcessing::ConditionVariable.new
3.times do
  fork do
    m.synchronize do
      puts "waiting pid:#{Process.pid}"
      cond.wait(m)
      puts "restarted pid:#{Process.pid}"
    end
  end
end
sleep 0.1 # => 3 processes get waiting a signal
cond.signal # => One process restarts
cond.broadcast # => Remaining 2 process restart
Process.waitall

Instance Method Summary collapse

Constructor Details

#initializeConditionVariable

Returns a new instance of ConditionVariable.



34
35
36
37
# File 'lib/multiprocessing/conditionvariable.rb', line 34

def initialize
  @waiting_pout,@waiting_pin = IO.pipe
  @signal_pout,@signal_pin = IO.pipe
end

Instance Method Details

#broadcastFixnum

Wakes up all threads waiting for this lock.

Returns:

  • (Fixnum)

    Number of threads waked up



45
46
47
48
49
50
51
# File 'lib/multiprocessing/conditionvariable.rb', line 45

def broadcast
  n = 0
  while(signal)
    n += 1
  end
  return n
end

#signalBoolean

Note:

An order of waking up is indefinite.

Wakes up one of threads waiting for this lock.

Returns:

  • (Boolean)

    Returns true if wakes up. Returns false if no threads were waiting.



61
62
63
64
65
66
67
68
69
70
71
# File 'lib/multiprocessing/conditionvariable.rb', line 61

def signal
  MultiProcessing.try_handle_interrupt(RuntimeError => :never) do
    begin
      @waiting_pout.read_nonblock 1
      @signal_pin.syswrite 1
      return true
    rescue Errno::EAGAIN
      return false
    end
  end
end

#wait(mutex) ⇒ ConditionVariable

Note:

Do not pass an instance of ::Mutex. Pass an instance of MultiProcessing::Mutex.

Releases the lock held in mutex and waits, reacquires the lock on wakeup.

Parameters:

  • mutex (Mutex)

    An instance of MultiProcessing::Mutex. It must be locked.

Returns:

Raises:

  • (TypeError)
  • (ArgumentError)


84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/multiprocessing/conditionvariable.rb', line 84

def wait(mutex)
  MultiProcessing.try_handle_interrupt(RuntimeError => :never) do
    raise TypeError.new("mutex must be instance of MultiProcessing::Mutex") if mutex.class != MultiProcessing::Mutex
    raise ArgumentError.new("mutex must be locked") unless mutex.locked?
    @waiting_pin.syswrite 1
    mutex.unlock
    begin
      MultiProcessing.try_handle_interrupt(RuntimeError => :on_blocking) do
        @signal_pout.readpartial 1
      end
    rescue Exception => e
      @waiting_pout.readpartial 1
      raise e
    ensure
      mutex.lock
    end
    self
  end
end