Class: RoMQ::Connection

Inherits:
Object
  • Object
show all
Includes:
Helpers
Defined in:
lib/romq/connection.rb

Constant Summary

Constants included from Helpers

Helpers::DEBUG

Instance Method Summary collapse

Methods included from Helpers

#flush_logs_periodically, #logger, #stop_gracefully

Constructor Details

#initialize(&block) ⇒ Connection

Returns a new instance of Connection.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/romq/connection.rb', line 10

def initialize(&block)
  stop_gracefully

  EM.run do
    flush_logs_periodically.()

    connection = Config.rabbitmq_connection
    @amqp_connection = AMQP.connect(connection)

    @amqp_connection.on_error do |channel, connection_close|
      raise channel_close.reply_text
    end

    @amqp_connection.on_open do
      logger.info("[RoMQ] connected to #{connection[:host]}")
      @amqp_connection.on_tcp_connection_loss do
        reconnect.(block)
      end

      channel  = AMQP::Channel.new(@amqp_connection, auto_recovery: true)
      channel.on_error(&handle_channel_error)
      channel.prefetch(Config::RABBITMQ_PREFECTH)

      yield(channel)
    end
  end
rescue AMQP::TCPConnectionFailed
  reconnect.(block)
rescue AMQP::PossibleAuthenticationFailureError
  logger.info("[RoMQ] could not authenticate, check your credentials: #{@amqp_connection.settings}")
  stop.()
end

Instance Method Details

#handle_channel_errorObject



65
66
67
68
69
70
71
72
73
# File 'lib/romq/connection.rb', line 65

def handle_channel_error
  Proc.new do |channel, channel_close|
    logger.error("[RoMQ] channel-level exception")
    logger.error("[RoMQ] AMQP class id : #{channel_close.class_id}")
    logger.error("[RoMQ] AMQP method id: #{channel_close.method_id}")
    logger.error("[RoMQ] Status code   : #{channel_close.reply_code}")
    logger.error("[RoMQ] Error message : #{channel_close.reply_text}")
  end
end

#reconnectObject



43
44
45
46
47
48
49
50
# File 'lib/romq/connection.rb', line 43

def reconnect
  Proc.new do |block|
    logger.info("[RoMQ] will try to re-connect in #{Config::RABBITMQ_RECONNECT_IN} seconds...")
    EM.stop if EM.reactor_running?
    sleep Config::RABBITMQ_RECONNECT_IN
    Connection.new(&block)
  end
end

#stopObject



52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/romq/connection.rb', line 52

def stop
  Proc.new do
    if @amqp_connection and @amqp_connection.connected?
      @amqp_connection.disconnect do
        logger.info("[RoMQ] closed connection to #{@amqp_connection.settings[:host]}")
        super
      end
    else
      super
    end
  end
end