Class: Rabbit::Publishing::ChannelsPool::BaseQueue

Inherits:
Queue
  • Object
show all
Defined in:
lib/rabbit/publishing/channels_pool.rb

Direct Known Subclasses

ConfirmQueue

Instance Method Summary collapse

Constructor Details

#initialize(session, max_size) ⇒ BaseQueue

Returns a new instance of BaseQueue.



7
8
9
10
11
12
13
14
15
# File 'lib/rabbit/publishing/channels_pool.rb', line 7

def initialize(session, max_size)
  super()

  @session    = session
  @max_size   = max_size - 1
  @ch_size    = 0
  @create_mon = Mutex.new
  @ch_dec_mon = Mutex.new
end

Instance Method Details

#add_channelObject



31
32
33
34
35
36
37
38
# File 'lib/rabbit/publishing/channels_pool.rb', line 31

def add_channel
  @create_mon.synchronize do
    return unless @ch_size < @max_size

    push create_channel
    @ch_size += 1
  end
end

#popObject Also known as: deq



17
18
19
20
21
# File 'lib/rabbit/publishing/channels_pool.rb', line 17

def pop
  add_channel if size.zero?

  super
end

#push(channel) ⇒ Object Also known as: enq



24
25
26
27
28
# File 'lib/rabbit/publishing/channels_pool.rb', line 24

def push(channel)
  return @ch_dec_mon.synchronize { @ch_size -= 1 } unless channel&.open?

  super
end