Class: Chef::IndexQueue::AmqpClient

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/chef/index_queue/amqp_client.rb

Instance Method Summary collapse

Constructor Details

#initializeAmqpClient

Returns a new instance of AmqpClient.



24
25
26
# File 'lib/chef/index_queue/amqp_client.rb', line 24

def initialize
  reset!
end

Instance Method Details

#amqp_clientObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/chef/index_queue/amqp_client.rb', line 40

def amqp_client
  unless @amqp_client
    begin
      @amqp_client = Bunny.new(amqp_opts)
      Chef::Log.debug "Starting AMQP connection with client settings: #{@amqp_client.inspect}"
      @amqp_client.start
      @amqp_client.qos(:prefetch_count => 1)
    rescue Bunny::ServerDownError => e
      Chef::Log.fatal "Could not connect to rabbitmq. Is it running, reachable, and configured correctly?"
      raise e
    rescue Bunny::ProtocolError => e
      Chef::Log.fatal "Connection to rabbitmq refused. Check your rabbitmq configuration and chef's amqp* settings"
      raise e
    end
  end
  @amqp_client
end

#disconnected!Object



70
71
72
73
74
# File 'lib/chef/index_queue/amqp_client.rb', line 70

def disconnected!
  Chef::Log.error("Disconnected from the AMQP Broker (RabbitMQ)")
  @amqp_client = nil
  reset!
end

#exchangeObject



58
59
60
# File 'lib/chef/index_queue/amqp_client.rb', line 58

def exchange
  @exchange ||= amqp_client.exchange("chef-indexer", :durable => true, :type => :fanout)
end

#queueObject



62
63
64
65
66
67
68
# File 'lib/chef/index_queue/amqp_client.rb', line 62

def queue
  unless @queue
    @queue = amqp_client.queue("chef-index-consumer-" + consumer_id, :durable => durable_queue?)
    @queue.bind(exchange)
  end
  @queue
end

#reset!Object



28
29
30
31
32
33
# File 'lib/chef/index_queue/amqp_client.rb', line 28

def reset!
  @amqp_client && amqp_client.connected? && amqp_client.stop
  @amqp_client = nil
  @exchange = nil
  @queue = nil
end

#send_action(action, data) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/chef/index_queue/amqp_client.rb', line 76

def send_action(action, data)
  retries = 0
  begin
    exchange.publish(Chef::JSONCompat.to_json({"action" => action.to_s, "payload" => data}))
  rescue Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET
    disconnected!
    if (retries += 1) < 2
      Chef::Log.info("Attempting to reconnect to the AMQP broker")
      retry
    else
      Chef::Log.fatal("Could not re-connect to the AMQP broker, giving up")
      raise
    end
  end
end

#stopObject



35
36
37
38
# File 'lib/chef/index_queue/amqp_client.rb', line 35

def stop
  @queue && @queue.subscription && @queue.unsubscribe
  @amqp_client && @amqp_client.stop
end