Class: FnordMetric::STOMPAcceptor

Inherits:
Object
  • Object
show all
Defined in:
lib/fnordmetric/acceptors/stomp_acceptor.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ STOMPAcceptor

Returns a new instance of STOMPAcceptor.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/fnordmetric/acceptors/stomp_acceptor.rb', line 14

def initialize(opts)
  @mutex = Mutex.new

  client = Stomp::Client.new(:hosts => [{
    :host => opts[:host],
    :port => opts[:port],
    :passcode => opts[:password],
    :login => opts[:username]}])

  msg_handler = lambda do |topic, msg|
    data = msg.body

    event = begin
      JSON.parse(data)
    rescue
      FnordMetric.log("[STOMP] received invalid JSON: #{data[0..60]}")
    end

    if event
      event["_type"] ||= topic.gsub(/^\/topic\//, '')
      @mutex.synchronize{ events << event }
    end
  end

  opts[:topics].each do |topic|
    client.subscribe(topic){ |data| msg_handler[topic, data] }
  end

  Thread.new do
    client.join
  end

  EM.next_tick(&method(:push_next_event))
end

Class Method Details

.outbound?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/fnordmetric/acceptors/stomp_acceptor.rb', line 67

def self.outbound?
  true
end

.start(opts) ⇒ Object



3
4
5
6
7
8
9
10
11
12
# File 'lib/fnordmetric/acceptors/stomp_acceptor.rb', line 3

def self.start(opts)
  begin
    require "stomp"
  rescue LoadError
    FnordMetric.error("require 'stomp' failed, you need the stomp gem")
    exit 1
  end

  new(opts)
end

Instance Method Details

#apiObject



63
64
65
# File 'lib/fnordmetric/acceptors/stomp_acceptor.rb', line 63

def api
  @api ||= FnordMetric::API.new(FnordMetric.options)
end

#eventsObject



59
60
61
# File 'lib/fnordmetric/acceptors/stomp_acceptor.rb', line 59

def events
  @events ||= []
end

#push_next_eventObject



49
50
51
52
53
54
55
56
57
# File 'lib/fnordmetric/acceptors/stomp_acceptor.rb', line 49

def push_next_event
  nxt = @mutex.synchronize{ events.pop }
  unless nxt
    EM::Timer.new(0.01, &method(:push_next_event))
    return true
  end
  api.event(nxt)
  EM.next_tick(&method(:push_next_event))
end