Class: ThreadQueues::BufferedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/thread_queues/buffered_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(max_queued) ⇒ BufferedQueue

Returns a new instance of BufferedQueue.



4
5
6
7
8
9
10
11
# File 'lib/thread_queues/buffered_queue.rb', line 4

def initialize(max_queued)
  @que = []
  @mutex = Mutex.new
  @cond_read = ConditionVariable.new
  @cond_write = ConditionVariable.new
  @num_waiting = 0
  @max_queued = max_queued
end

Instance Method Details

#closeObject



56
57
58
59
60
# File 'lib/thread_queues/buffered_queue.rb', line 56

def close
  @mutex.synchronize do
    @closed = true
  end
end

#lengthObject



52
53
54
# File 'lib/thread_queues/buffered_queue.rb', line 52

def length
  return @que.length
end

#popObject



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/thread_queues/buffered_queue.rb', line 29

def pop
  loop do
    @mutex.synchronize do
      raise EOFError, "Queue is empty and closed" if @que.empty? && @closed

      if @que.empty?
        @num_waiting += 1

        begin
          @cond_read.signal
          @cond_write.wait @mutex
        ensure
          @num_waiting -= 0
        end
      else
        obj = @que.shift
        @cond_read.signal
        return obj
      end
    end
  end
end

#push(obj) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/thread_queues/buffered_queue.rb', line 13

def push(obj)
  loop do
    @mutex.synchronize do
      raise EOFError, "Cannot write to closed queue" if @closed

      if length <= @max_queued
        @que.push(obj)
        @cond_write.signal
        return nil
      else
        @cond_read.wait @mutex
      end
    end
  end
end