Class: ThreadQueues::BlockingQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/thread_queues/blocking_queue.rb

Instance Method Summary collapse

Constructor Details

#initializeBlockingQueue

Returns a new instance of BlockingQueue.



2
3
4
5
6
7
8
# File 'lib/thread_queues/blocking_queue.rb', line 2

def initialize
  @que = []
  @mutex = Mutex.new
  @cond_read = ConditionVariable.new
  @cond_write = ConditionVariable.new
  @num_waiting = 0
end

Instance Method Details

#closeObject



47
48
49
50
51
# File 'lib/thread_queues/blocking_queue.rb', line 47

def close
  @mutex.synchronize do
    @closed = true
  end
end

#popObject



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/thread_queues/blocking_queue.rb', line 26

def pop
  loop do
    @mutex.synchronize do
      raise EOFError, "Queue is empty and closed" if @que.empty? && @closed

      if @que.empty?
        @num_waiting += 1

        begin
          @cond_read.signal
          @cond_write.wait @mutex
        ensure
          @num_waiting -= 0
        end
      else
        return @que.shift
      end
    end
  end
end

#push(obj) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/thread_queues/blocking_queue.rb', line 10

def push(obj)
  loop do
    @mutex.synchronize do
      raise EOFError, "Cannot write to closed queue" if @closed

      if @num_waiting > 0 && @que.empty?
        @que.push(obj)
        @cond_write.signal
        return nil
      else
        @cond_read.wait @mutex
      end
    end
  end
end