Module: Chef::IndexQueue::Consumer

Defined in:
lib/chef/index_queue/consumer.rb

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(including_class) ⇒ Object



37
38
39
# File 'lib/chef/index_queue/consumer.rb', line 37

def self.included(including_class)
  including_class.send(:extend, ClassMethods)
end

Instance Method Details

#call_action_for_message(message) ⇒ Object



58
59
60
61
62
63
64
# File 'lib/chef/index_queue/consumer.rb', line 58

def call_action_for_message(message)
  amqp_payload  = Chef::JSONCompat.from_json(message[:payload], :create_additions => false, :max_nesting => false)
  action        = amqp_payload["action"].to_sym
  app_payload   = amqp_payload["payload"]
  assert_method_whitelisted(action)
  send(action, app_payload)
end

#runObject Also known as: start



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/chef/index_queue/consumer.rb', line 41

def run
  Chef::Log.debug("Starting Index Queue Consumer")
  AmqpClient.instance.queue # triggers connection setup
  
  begin
    AmqpClient.instance.queue.subscribe(:ack => true, :timeout => false) do |message|
      call_action_for_message(message)
    end
  rescue Bunny::ConnectionError, Errno::ECONNRESET, Bunny::ServerDownError
    AmqpClient.instance.disconnected!
    Chef::Log.warn "Connection to rabbitmq lost. attempting to reconnect"
    sleep 1
    retry
  end
end