Class: Concurrent::BufferedChannel

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent/channel/buffered_channel.rb

Instance Method Summary collapse

Constructor Details

#initialize(size) ⇒ BufferedChannel

Returns a new instance of BufferedChannel.



8
9
10
11
12
13
14
15
# File 'lib/concurrent/channel/buffered_channel.rb', line 8

def initialize(size)
  @mutex = Mutex.new
  @condition = Condition.new
  @buffer_condition = Condition.new

  @probe_set = WaitableList.new
  @buffer = RingBuffer.new(size)
end

Instance Method Details

#buffer_queue_sizeObject



21
22
23
# File 'lib/concurrent/channel/buffered_channel.rb', line 21

def buffer_queue_size
  @mutex.synchronize { @buffer.count }
end

#popObject



30
31
32
33
34
# File 'lib/concurrent/channel/buffered_channel.rb', line 30

def pop
  probe = Channel::Probe.new
  select(probe)
  probe.value
end

#probe_set_sizeObject



17
18
19
# File 'lib/concurrent/channel/buffered_channel.rb', line 17

def probe_set_size
  @probe_set.size
end

#push(value) ⇒ Object



25
26
27
28
# File 'lib/concurrent/channel/buffered_channel.rb', line 25

def push(value)
  until set_probe_or_push_into_buffer(value)
  end
end

#remove_probe(probe) ⇒ Object



49
50
51
# File 'lib/concurrent/channel/buffered_channel.rb', line 49

def remove_probe(probe)
  @probe_set.delete(probe)
end

#select(probe) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/concurrent/channel/buffered_channel.rb', line 36

def select(probe)
  @mutex.synchronize do

    if @buffer.empty?
      @probe_set.put(probe)
      true
    else
      shift_buffer if probe.set_unless_assigned(peek_buffer, self)
    end

  end
end