Class: FnordMetric::InboundStream

Inherits:
EventMachine::Connection
  • Object
show all
Defined in:
lib/fnordmetric/inbound_stream.rb

Constant Summary collapse

@@opts =
nil

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.start(opts) ⇒ Object



5
6
7
8
# File 'lib/fnordmetric/inbound_stream.rb', line 5

def self.start(opts)
  @@opts = opts
  EM.start_server(*opts[:inbound_stream], self)    
end

Instance Method Details

#close_connection?Boolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/fnordmetric/inbound_stream.rb', line 49

def close_connection?
  @redis.quit unless @streaming || (@events_buffered!=0) 
end

#get_next_uuidObject



45
46
47
# File 'lib/fnordmetric/inbound_stream.rb', line 45

def get_next_uuid
  rand(9999999999999999999).to_s # FIXME
end

#next_eventObject



27
28
29
30
# File 'lib/fnordmetric/inbound_stream.rb', line 27

def next_event
  read_next_event
  push_next_event
end

#post_initObject



53
54
55
56
57
58
59
# File 'lib/fnordmetric/inbound_stream.rb', line 53

def post_init
  @redis = Redis.new
  @events_buffered = 0
  @streaming = true
  @buffer = ""
  @events = []          
end

#push_event(event_id, event_data) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/fnordmetric/inbound_stream.rb', line 15

def push_event(event_id, event_data)    
  prefix = @@opts[:redis_prefix]
      
  @redis.hincrby "#{prefix}-stats",             "events_received", 1
  @redis.set     "#{prefix}-event-#{event_id}", event_data
  @redis.lpush   "#{prefix}-queue",             event_id       
  @redis.expire  "#{prefix}-event-#{event_id}", @@opts[:event_queue_ttl]
  
  @events_buffered -= 1
  close_connection?
end

#push_next_eventObject



39
40
41
42
43
# File 'lib/fnordmetric/inbound_stream.rb', line 39

def push_next_event
  return true if @events.empty?
  push_event(get_next_uuid, @events.pop) 
  EM.next_tick(&method(:push_next_event))    
end

#read_next_eventObject



32
33
34
35
36
37
# File 'lib/fnordmetric/inbound_stream.rb', line 32

def read_next_event
  while (event = @buffer.slice!(/^(.*)\n/))
    @events_buffered += 1
    @events << event
  end 
end

#receive_data(chunk) ⇒ Object



10
11
12
13
# File 'lib/fnordmetric/inbound_stream.rb', line 10

def receive_data(chunk)        
  @buffer << chunk         
  EM.defer{ next_event }
end

#unbindObject



61
62
63
64
# File 'lib/fnordmetric/inbound_stream.rb', line 61

def unbind
  @streaming = false
  close_connection?
end