Class: RabbitJobs::AmqpHelper

Inherits:
Object
  • Object
show all
Defined in:
lib/rabbit_jobs/amqp_helper.rb

Constant Summary collapse

RECOVERY_TIMEOUT =

Timeout to recover connection.

3
HOSTS_DEAD =
[]
HOSTS_FAILED =
{}
AUTO_RECOVERY_ENABLED =
true

Class Method Summary collapse

Class Method Details

.create_channel(connection) ⇒ Object



25
26
27
# File 'lib/rabbit_jobs/amqp_helper.rb', line 25

def create_channel(connection)
  AMQP::Channel.new(connection, auto_recovery: AUTO_RECOVERY_ENABLED)
end

.init_auto_recovery(connection) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/rabbit_jobs/amqp_helper.rb', line 29

def init_auto_recovery(connection)
  connection.on_recovery do |conn, opts|
    HOSTS_DEAD.clear
    HOSTS_FAILED.clear
    url = url_from_opts opts
    RJ.logger.warn "Connection to #{url} recovered."
  end

  connection.on_open do |conn, opts|
    RJ.logger.info "Connected."
  end

  # connection.on_tcp_connection_loss do |conn, opts|
  #   sleep 1
  #   restore_from_connection_failure(conn, opts) unless conn.closed?
  # end

  connection.on_tcp_connection_failure do |opts|
    sleep 1
    restore_from_connection_failure(connection, opts)
  end


  # connection.before_recovery do |conn, opts|
  #   RJ.logger.info "before_recovery"
  # end

  # connection.on_possible_authentication_failure do |conn, opts|
  #   puts opts.inspect
  #   # restore_from_connection_failure(conn, opts)
  # end

  # connection.on_connection_interruption do |conn|
  #   # restore_from_connection_failure(conn, opts)
  # end
end

.prepare_connection(conn) ⇒ Object



16
17
18
19
20
21
22
23
# File 'lib/rabbit_jobs/amqp_helper.rb', line 16

def prepare_connection(conn)
  if !conn || conn.closed?
    RJ.logger.info("Connecting to #{RJ.config.servers.first.to_s}...")
    conn = AMQP.connect(RJ.config.servers.first, auto_recovery: AUTO_RECOVERY_ENABLED)
    init_auto_recovery(conn) if AUTO_RECOVERY_ENABLED
  end
  conn
end