Class: Concurrent::Channel::BufferedChannel

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent/channel/buffered_channel.rb

Instance Method Summary collapse

Constructor Details

#initialize(size) ⇒ BufferedChannel

Returns a new instance of BufferedChannel.



10
11
12
13
14
15
16
# File 'lib/concurrent/channel/buffered_channel.rb', line 10

def initialize(size)
  @mutex = Mutex.new
  @buffer_condition = ConditionVariable.new

  @probe_set = WaitableList.new
  @buffer = RingBuffer.new(size)
end

Instance Method Details

#buffer_queue_sizeObject



22
23
24
# File 'lib/concurrent/channel/buffered_channel.rb', line 22

def buffer_queue_size
  @mutex.synchronize { @buffer.count }
end

#popObject



31
32
33
34
35
# File 'lib/concurrent/channel/buffered_channel.rb', line 31

def pop
  probe = Channel::Probe.new
  select(probe)
  probe.value
end

#probe_set_sizeObject



18
19
20
# File 'lib/concurrent/channel/buffered_channel.rb', line 18

def probe_set_size
  @probe_set.size
end

#push(value) ⇒ Object



26
27
28
29
# File 'lib/concurrent/channel/buffered_channel.rb', line 26

def push(value)
  until set_probe_or_push_into_buffer(value)
  end
end

#remove_probe(probe) ⇒ Object



50
51
52
# File 'lib/concurrent/channel/buffered_channel.rb', line 50

def remove_probe(probe)
  @probe_set.delete(probe)
end

#select(probe) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/concurrent/channel/buffered_channel.rb', line 37

def select(probe)
  @mutex.synchronize do

    if @buffer.empty?
      @probe_set.put(probe)
      true
    else
      shift_buffer if probe.try_set([peek_buffer, self])
    end

  end
end