Class: Chef::IndexQueue::AmqpClient

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/chef/index_queue/amqp_client.rb

Constant Summary collapse

VNODES =
1024

Instance Method Summary collapse

Constructor Details

#initializeAmqpClient

Returns a new instance of AmqpClient.



26
27
28
# File 'lib/chef/index_queue/amqp_client.rb', line 26

def initialize
  reset!
end

Instance Method Details

#amqp_clientObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/chef/index_queue/amqp_client.rb', line 40

def amqp_client
  unless @amqp_client
    begin
      @amqp_client = Bunny.new(amqp_opts)
      Chef::Log.debug "Starting AMQP connection with client settings: #{@amqp_client.inspect}"
      @amqp_client.start
      @amqp_client.qos(:prefetch_count => 1)
    rescue Bunny::ServerDownError => e
      Chef::Log.fatal "Could not connect to rabbitmq. Is it running, reachable, and configured correctly?"
      raise e
    rescue Bunny::ProtocolError => e
      Chef::Log.fatal "Connection to rabbitmq refused. Check your rabbitmq configuration and chef's amqp* settings"
      raise e
    end
  end
  @amqp_client
end

#disconnected!Object



62
63
64
65
66
# File 'lib/chef/index_queue/amqp_client.rb', line 62

def disconnected!
  Chef::Log.error("Disconnected from the AMQP Broker (RabbitMQ)")
  @amqp_client = nil
  reset!
end

#exchangeObject



58
59
60
# File 'lib/chef/index_queue/amqp_client.rb', line 58

def exchange
  @exchange ||= amqp_client.exchange("chef-indexer", :durable => true, :type => :fanout)
end

#queue_for_object(obj_id) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/chef/index_queue/amqp_client.rb', line 68

def queue_for_object(obj_id)
  retries = 0
  vnode_tag = obj_id_to_int(obj_id) % VNODES
  begin
    yield amqp_client.queue("vnode-#{vnode_tag}", :passive => false, :durable => true, :exclusive => false, :auto_delete => false)
  rescue Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET
    disconnected!
    if (retries += 1) < 2
      Chef::Log.info("Attempting to reconnect to the AMQP broker")
      retry
    else
      Chef::Log.fatal("Could not re-connect to the AMQP broker, giving up")
      raise
    end
  end
end

#reset!Object



30
31
32
33
34
# File 'lib/chef/index_queue/amqp_client.rb', line 30

def reset!
  @amqp_client && amqp_client.connected? && amqp_client.stop
  @amqp_client = nil
  @exchange = nil
end

#stopObject



36
37
38
# File 'lib/chef/index_queue/amqp_client.rb', line 36

def stop
  @amqp_client && @amqp_client.stop
end