Class: Queight::ChannelPool

Inherits:
Object
  • Object
show all
Defined in:
lib/queight/channel_pool.rb

Constant Summary collapse

DEFAULT_POOL_SIZE =
5

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ ChannelPool

Returns a new instance of ChannelPool.



10
11
12
13
14
15
# File 'lib/queight/channel_pool.rb', line 10

def initialize(options)
  @options = options
  @wrappers = []
  @error_count = 0
  @lock = Monitor.new
end

Instance Method Details

#create_channel(prefetch = nil) ⇒ Object



38
39
40
41
42
43
# File 'lib/queight/channel_pool.rb', line 38

def create_channel(prefetch = nil)
  channel = conn.create_channel
  channel.prefetch(prefetch) if prefetch

  channel
end

#reset_cache!Object



45
46
47
# File 'lib/queight/channel_pool.rb', line 45

def reset_cache!
  @wrappers.each(&:reset_cache!)
end

#with_channelObject



17
18
19
20
21
# File 'lib/queight/channel_pool.rb', line 17

def with_channel
  tracking_bunny_errors do
    channel_pool.run { |channel| yield(channel) }
  end
end

#with_subscribe_channel(prefetch) ⇒ Object



29
30
31
32
33
34
35
36
# File 'lib/queight/channel_pool.rb', line 29

def with_subscribe_channel(prefetch)
  tracking_bunny_errors do
    channel = create_channel(prefetch)
    yield(channel)
  end
ensure
  channel.close
end

#with_transactional_channelObject



23
24
25
26
27
# File 'lib/queight/channel_pool.rb', line 23

def with_transactional_channel
  tracking_bunny_errors do
    transactional_channel_pool.run { |channel| yield(channel) }
  end
end