Class: Wildcloud::Keeper::Transport::Amqp

Inherits:
Object
  • Object
show all
Defined in:
lib/wildcloud/keeper/transport/amqp.rb

Instance Method Summary collapse

Constructor Details

#initializeAmqp

Returns a new instance of Amqp.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/wildcloud/keeper/transport/amqp.rb', line 28

def initialize
  Keeper.logger.debug('AMQP') { 'Connecting to broker' }

  @connection = AMQP.connect(Keeper.configuration['amqp'])
  Keeper.add_amqp_logger(@connection)

  @channel = AMQP::Channel.new(@connection)
  @channel.prefetch(Keeper.configuration['workers'])

  @exchange = @channel.topic('wildcloud.keeper')
  @queue = @channel.queue("wildcloud.keeper.node.#{Keeper.configuration['node']['name']}")
  @queue.bind(@exchange, :routing_key => 'nodes')
  @queue.bind(@exchange, :routing_key => "node.#{Keeper.configuration['node']['name']}")

  if Keeper.configuration['builder']
    @builders = @channel.queue("wildcloud.keeper.build")
    @builders.bind(@exchange, :routing_key => 'build')
  end
end

Instance Method Details

#process_message(block, payload) ⇒ Object



58
59
60
# File 'lib/wildcloud/keeper/transport/amqp.rb', line 58

def process_message(block, payload)
  block.call(JSON.parse(payload))
end

#send(message, key) ⇒ Object



62
63
64
65
# File 'lib/wildcloud/keeper/transport/amqp.rb', line 62

def send(message, key)
  Keeper.logger.debug('AMQP') { "Publishing message (key: #{key}) #{message.inspect}" }
  @exchange.publish(JSON.dump(message), :routing_key => key.to_s)
end

#start(&block) ⇒ Object



48
49
50
51
52
53
54
55
56
# File 'lib/wildcloud/keeper/transport/amqp.rb', line 48

def start(&block)
  Keeper.logger.info('AMQP') { 'Starting to receive messages' }
  @subscription = @queue.subscribe do |, payload|
    process_message(block, payload)
  end
  @building = @builders.subscribe do |, payload|
    process_message(block, payload)
  end if @builders
end