Class: FnordMetric::Worker
- Inherits:
-
Object
- Object
- FnordMetric::Worker
- Defined in:
- lib/fnordmetric/worker.rb
Instance Method Summary collapse
- #announce_event(event) ⇒ Object
- #configure(namespaces) ⇒ Object
- #event_key(event_id) ⇒ Object
- #expire_event(event_id) ⇒ Object
-
#initialize(namespaces, opts) ⇒ Worker
constructor
A new instance of Worker.
- #namespace(key) ⇒ Object
- #parse_json(data) ⇒ Object
- #process_event(event_id, event_data) ⇒ Object
- #publish_event(event) ⇒ Object
- #pubsub_key ⇒ Object
- #queue_key ⇒ Object
- #ready! ⇒ Object
- #stats_key ⇒ Object
- #tick ⇒ Object
Constructor Details
#initialize(namespaces, opts) ⇒ Worker
Returns a new instance of Worker.
3 4 5 6 7 |
# File 'lib/fnordmetric/worker.rb', line 3 def initialize(namespaces, opts) @namespaces = {} @opts = opts configure(namespaces) end |
Instance Method Details
#announce_event(event) ⇒ Object
60 61 62 |
# File 'lib/fnordmetric/worker.rb', line 60 def announce_event(event) namespace(event[:_namespace]).ready!(@redis).announce(event) end |
#configure(namespaces) ⇒ Object
14 15 16 17 18 19 |
# File 'lib/fnordmetric/worker.rb', line 14 def configure(namespaces) namespaces.each do |key, block| @namespaces[key] = FnordMetric::Namespace.new(key, @opts.clone) @namespaces[key].instance_eval(&block) end end |
#event_key(event_id) ⇒ Object
52 53 54 |
# File 'lib/fnordmetric/worker.rb', line 52 def event_key(event_id) [@opts[:redis_prefix], 'event', event_id].join("-") end |
#expire_event(event_id) ⇒ Object
64 65 66 |
# File 'lib/fnordmetric/worker.rb', line 64 def expire_event(event_id) @redis.expire(event_key(event_id), @opts[:event_data_ttl]) end |
#namespace(key) ⇒ Object
72 73 74 |
# File 'lib/fnordmetric/worker.rb', line 72 def namespace(key) (@namespaces[key] || @namespaces.first.last).clone end |
#parse_json(data) ⇒ Object
76 77 78 |
# File 'lib/fnordmetric/worker.rb', line 76 def parse_json(data) Yajl::Parser.new(:symbolize_keys => true).parse(data) end |
#process_event(event_id, event_data) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/fnordmetric/worker.rb', line 32 def process_event(event_id, event_data) EM.defer do parse_json(event_data).tap do |event| event[:_time] ||= Time.now.to_i event[:_eid] = event_id announce_event(event) publish_event(event) expire_event(event_id) end end end |
#publish_event(event) ⇒ Object
68 69 70 |
# File 'lib/fnordmetric/worker.rb', line 68 def publish_event(event) @redis.publish(pubsub_key, event[:_eid]) end |
#pubsub_key ⇒ Object
44 45 46 |
# File 'lib/fnordmetric/worker.rb', line 44 def pubsub_key [@opts[:redis_prefix], 'announce'].join("-") end |
#queue_key ⇒ Object
48 49 50 |
# File 'lib/fnordmetric/worker.rb', line 48 def queue_key [@opts[:redis_prefix], 'queue'].join("-") end |
#ready! ⇒ Object
9 10 11 12 |
# File 'lib/fnordmetric/worker.rb', line 9 def ready! @redis = EM::Hiredis.connect(@opts[:redis_uri]) tick end |
#stats_key ⇒ Object
56 57 58 |
# File 'lib/fnordmetric/worker.rb', line 56 def stats_key [@opts[:redis_prefix], 'stats'].join("-") end |
#tick ⇒ Object
21 22 23 24 25 26 27 28 29 30 |
# File 'lib/fnordmetric/worker.rb', line 21 def tick @redis.blpop(queue_key, 0).callback do |list, event_id| @redis.get(event_key(event_id)).callback do |event_data| process_event(event_id, event_data) if event_data FnordMetric.log("oops, lost an event :(") unless event_data EM.next_tick(&method(:tick)) @redis.hincrby(stats_key, :events_processed, 1) end end end |