Class: ConditionVariable

Inherits:
Object
  • Object
show all
Defined in:
lib/thread.rb

Overview

ConditionVariable objects augment class Mutex. Using condition variables, it is possible to suspend while in the middle of a critical section until a resource becomes available.

Example:

require 'thread'

mutex = Mutex.new
resource = ConditionVariable.new

a = Thread.new {
  mutex.synchronize {
    # Thread 'a' now needs the resource
    resource.wait(mutex)
    # 'a' can now have the resource
  }
}

b = Thread.new {
  mutex.synchronize {
    # Thread 'b' has finished using the resource
    resource.signal
  }
}

Instance Method Summary collapse

Constructor Details

#initializeConditionVariable

Creates a new ConditionVariable



54
55
56
57
# File 'lib/thread.rb', line 54

def initialize
  @waiters = {}
  @waiters_mutex = Mutex.new
end

Instance Method Details

#broadcastObject

Wakes up all threads waiting for this lock.



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/thread.rb', line 101

def broadcast
  Thread.handle_interrupt(StandardError => :on_blocking) do
    threads = nil
    @waiters_mutex.synchronize do
      threads = @waiters.keys
      @waiters.clear
    end
    for t in threads
      begin
        t.run
      rescue ThreadError
      end
    end
  end
  self
end

#signalObject

Wakes up the first thread in line waiting for this lock.



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/thread.rb', line 86

def signal
  Thread.handle_interrupt(StandardError => :on_blocking) do
    begin
      t, _ = @waiters_mutex.synchronize { @waiters.shift }
      t.run if t
    rescue ThreadError
      retry # t was already dead?
    end
  end
  self
end

#wait(mutex, timeout = nil) ⇒ Object

Releases the lock held in mutex and waits; reacquires the lock on wakeup.

If timeout is given, this method returns after timeout seconds passed, even if no other thread doesn’t signal.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/thread.rb', line 65

def wait(mutex, timeout=nil)
  Thread.handle_interrupt(StandardError => :never) do
    begin
      Thread.handle_interrupt(StandardError => :on_blocking) do
        @waiters_mutex.synchronize do
          @waiters[Thread.current] = true
        end
        mutex.sleep timeout
      end
    ensure
      @waiters_mutex.synchronize do
        @waiters.delete(Thread.current)
      end
    end
  end
  self
end