Class: FnordMetric::AMQPAcceptor

Inherits:
Object
  • Object
show all
Defined in:
lib/fnordmetric/acceptors/amqp_acceptor.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ AMQPAcceptor

Returns a new instance of AMQPAcceptor.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/fnordmetric/acceptors/amqp_acceptor.rb', line 14

def initialize(opts)
  amqp = AMQP.connect(:host => 'firehose')
  amqp_channel = AMQP::Channel.new(amqp)

  msg_handler = lambda do |channel, data|
    event = begin
      JSON.parse(data)
    rescue
      FnordMetric.log("[AMQP] received invalid JSON: #{data[0..60]}")
    end

    if event
      event["_type"] ||= channel
      events << event
      push_next_event
    end
  end

  opts[:channels].each do |channel|
    queue = amqp_channel.queue(channel, :auto_delete => true)
    queue.subscribe{ |data| msg_handler[channel, data] }
  end
end

Class Method Details

.outbound?Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/fnordmetric/acceptors/amqp_acceptor.rb', line 52

def self.outbound?
  true
end

.start(opts) ⇒ Object



3
4
5
6
7
8
9
10
11
12
# File 'lib/fnordmetric/acceptors/amqp_acceptor.rb', line 3

def self.start(opts)
  begin
    require "amqp"
  rescue LoadError
    FnordMetric.error("require 'amqp' failed, you need the amqp gem")
    exit 1
  end

  new(opts)
end

Instance Method Details

#apiObject



48
49
50
# File 'lib/fnordmetric/acceptors/amqp_acceptor.rb', line 48

def api
  @api ||= FnordMetric::API.new(FnordMetric.options)
end

#eventsObject



44
45
46
# File 'lib/fnordmetric/acceptors/amqp_acceptor.rb', line 44

def events
  @events ||= []
end

#push_next_eventObject



38
39
40
41
42
# File 'lib/fnordmetric/acceptors/amqp_acceptor.rb', line 38

def push_next_event
  return true if events.empty?
  api.event(@events.pop)
  EM.next_tick(&method(:push_next_event))
end