Module: PubSub::Subscriber

Includes:
BunnyConfig
Defined in:
lib/pub_sub/subscriber.rb

Constant Summary collapse

RETRY_WAIT =
5

Instance Method Summary collapse

Methods included from BunnyConfig

#bunny_config

Instance Method Details

#bunnyObject



23
24
25
26
27
28
29
30
31
32
# File 'lib/pub_sub/subscriber.rb', line 23

def bunny
  @bunny ||= begin
    logger.info("PubSub:Subscriber#bunny: initializing and starting new bunny instance")
    
    b = Bunny.new(bunny_config)
    b.start
    b.qos
    b
  end
end

#loggerObject



11
12
13
14
15
16
17
# File 'lib/pub_sub/subscriber.rb', line 11

def logger
  @logger ||= begin
    logger = Logger.new(STDOUT)
    logger.level = Logger::WARN
    logger
  end
end

#logger=(logger) ⇒ Object



7
8
9
# File 'lib/pub_sub/subscriber.rb', line 7

def logger=(logger)
  @logger = logger
end

#queue_name=(queue_name) ⇒ Object



19
20
21
# File 'lib/pub_sub/subscriber.rb', line 19

def queue_name=(queue_name)
  @queue_name = queue_name
end

#resetObject



34
35
36
37
38
39
40
41
# File 'lib/pub_sub/subscriber.rb', line 34

def reset
  logger.info("PubSub:Subscriber#reset: unsubscribing from queue and stopping bunny")
  
  @queue.unsubscribe rescue nil
  @queue = nil
  @bunny.stop rescue nil
  @bunny = nil
end

#subscribe(&block) ⇒ Object

simple subsciption loop on a durable queue, automatic acks creates the queue if it does not already exist



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/pub_sub/subscriber.rb', line 45

def subscribe(&block)
  raise "queue_name must be set before calling subscribe" unless @queue_name

  logger.info("PubSub:Subscriber#subscribe: subscribing to queue - #{@queue_name}")
  begin
    @queue = bunny.queue(@queue_name, :durable => true)
    @queue.subscribe(:ack => true) do |msg|
      block.call(msg)
    end
  rescue StandardError, Timeout::Error => error
    logger.info("PubSub:Subscriber: error encountered while subscribing to #{@queue_name}, retrying - #{error}")
    sleep RETRY_WAIT
    reset
    retry
  end
end