Class: FnordMetric::InboundStream
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- FnordMetric::InboundStream
- Defined in:
- lib/fnordmetric/inbound_stream.rb
Constant Summary collapse
- @@opts =
nil
Class Method Summary collapse
Instance Method Summary collapse
- #close_connection? ⇒ Boolean
- #get_next_uuid ⇒ Object
- #next_event ⇒ Object
- #post_init ⇒ Object
- #push_event(event_id, event_data) ⇒ Object
- #push_next_event ⇒ Object
- #read_next_event ⇒ Object
- #receive_data(chunk) ⇒ Object
- #unbind ⇒ Object
Class Method Details
.start(opts) ⇒ Object
5 6 7 8 |
# File 'lib/fnordmetric/inbound_stream.rb', line 5 def self.start(opts) @@opts = opts EM.start_server(*opts[:inbound_stream], self) end |
Instance Method Details
#close_connection? ⇒ Boolean
49 50 51 |
# File 'lib/fnordmetric/inbound_stream.rb', line 49 def close_connection? @redis.quit unless @streaming || (@events_buffered!=0) end |
#get_next_uuid ⇒ Object
45 46 47 |
# File 'lib/fnordmetric/inbound_stream.rb', line 45 def get_next_uuid rand(9999999999999999999).to_s # FIXME end |
#next_event ⇒ Object
27 28 29 30 |
# File 'lib/fnordmetric/inbound_stream.rb', line 27 def next_event read_next_event push_next_event end |
#post_init ⇒ Object
53 54 55 56 57 58 59 |
# File 'lib/fnordmetric/inbound_stream.rb', line 53 def post_init @redis = Redis.new @events_buffered = 0 @streaming = true @buffer = "" @events = [] end |
#push_event(event_id, event_data) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/fnordmetric/inbound_stream.rb', line 15 def push_event(event_id, event_data) prefix = @@opts[:redis_prefix] @redis.hincrby "#{prefix}-stats", "events_received", 1 @redis.set "#{prefix}-event-#{event_id}", event_data @redis.lpush "#{prefix}-queue", event_id @redis.expire "#{prefix}-event-#{event_id}", @@opts[:event_queue_ttl] @events_buffered -= 1 close_connection? end |
#push_next_event ⇒ Object
39 40 41 42 43 |
# File 'lib/fnordmetric/inbound_stream.rb', line 39 def push_next_event return true if @events.empty? push_event(get_next_uuid, @events.pop) EM.next_tick(&method(:push_next_event)) end |
#read_next_event ⇒ Object
32 33 34 35 36 37 |
# File 'lib/fnordmetric/inbound_stream.rb', line 32 def read_next_event while (event = @buffer.slice!(/^(.*)\n/)) @events_buffered += 1 @events << event end end |
#receive_data(chunk) ⇒ Object
10 11 12 13 |
# File 'lib/fnordmetric/inbound_stream.rb', line 10 def receive_data(chunk) @buffer << chunk EM.defer{ next_event } end |
#unbind ⇒ Object
61 62 63 64 |
# File 'lib/fnordmetric/inbound_stream.rb', line 61 def unbind @streaming = false close_connection? end |