Module: PubSub::FanoutPublisher

Includes:
BunnyConfig
Defined in:
lib/pub_sub/fanout_publisher.rb

Constant Summary collapse

RETRY_WAIT =
5

Instance Method Summary collapse

Methods included from BunnyConfig

#bunny_config

Instance Method Details

#bunnyObject



19
20
21
22
23
24
25
26
27
# File 'lib/pub_sub/fanout_publisher.rb', line 19

def bunny
  @bunny ||= begin
    logger.info("PubSub:FanoutPublisher#bunny: initializing and starting new bunny instance")
    
    b = Bunny.new(bunny_config)
    b.start
    b
  end
end

#loggerObject



11
12
13
14
15
16
17
# File 'lib/pub_sub/fanout_publisher.rb', line 11

def logger
  @logger ||= begin
    logger = Logger.new(STDOUT)
    logger.level = Logger::WARN
    logger
  end
end

#logger=(logger) ⇒ Object



7
8
9
# File 'lib/pub_sub/fanout_publisher.rb', line 7

def logger=(logger)
  @logger = logger
end

#publish(msg, exchange_name) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/pub_sub/fanout_publisher.rb', line 29

def publish(msg, exchange_name)      
  begin

    begin
      exchange = bunny.exchange(exchange_name, :type => :fanout, :durable => true)
    rescue StandardError, Timeout::Error => error
      logger.info("PubSub:FanoutPublisher#publish: resetting and reconnecting to #{exchange_name}, because - #{error}")
      sleep RETRY_WAIT
      reset
      retry
    end

    # non-persistent to a fanout exchange
    exchange.publish(msg)

  rescue StandardError, Timeout::Error => error
    logger.info("PubSub:FanoutPublisher#publish: error encountered while publishing to #{exchange_name} with routing_key #{routing_key}, retrying - #{error}")
    sleep RETRY_WAIT
    reset
    retry
  end
end

#resetObject



52
53
54
55
56
57
# File 'lib/pub_sub/fanout_publisher.rb', line 52

def reset
  logger.info("PubSub:FanoutPublisher#reset: stopping bunny")

  @bunny.stop rescue nil
  @bunny = nil
end