Class: FnordMetric::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/fnordmetric/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(namespaces, opts) ⇒ Worker

Returns a new instance of Worker.



3
4
5
6
7
# File 'lib/fnordmetric/worker.rb', line 3

def initialize(namespaces, opts)        
  @namespaces = {}
  @opts = opts
  configure(namespaces)
end

Instance Method Details

#announce_event(event) ⇒ Object



60
61
62
# File 'lib/fnordmetric/worker.rb', line 60

def announce_event(event)   
  namespace(event[:_namespace]).ready!(@redis).announce(event)
end

#configure(namespaces) ⇒ Object



14
15
16
17
18
19
# File 'lib/fnordmetric/worker.rb', line 14

def configure(namespaces)   
  namespaces.each do |key, block|
    @namespaces[key] = FnordMetric::Namespace.new(key, @opts.clone)
    @namespaces[key].instance_eval(&block)
  end
end

#event_key(event_id) ⇒ Object



52
53
54
# File 'lib/fnordmetric/worker.rb', line 52

def event_key(event_id)
  [@opts[:redis_prefix], 'event', event_id].join("-")
end

#expire_event(event_id) ⇒ Object



64
65
66
# File 'lib/fnordmetric/worker.rb', line 64

def expire_event(event_id)
  @redis.expire(event_key(event_id), @opts[:event_data_ttl])
end

#namespace(key) ⇒ Object



72
73
74
# File 'lib/fnordmetric/worker.rb', line 72

def namespace(key)
  (@namespaces[key] || @namespaces.first.last).clone
end

#parse_json(data) ⇒ Object



76
77
78
# File 'lib/fnordmetric/worker.rb', line 76

def parse_json(data)
  Yajl::Parser.new(:symbolize_keys => true).parse(data)
end

#process_event(event_id, event_data) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/fnordmetric/worker.rb', line 32

def process_event(event_id, event_data)
  EM.defer do      
    parse_json(event_data).tap do |event|                
      event[:_time] ||= Time.now.to_i
      event[:_eid] = event_id
      announce_event(event)
      publish_event(event)        
      expire_event(event_id)       
    end
  end
end

#publish_event(event) ⇒ Object



68
69
70
# File 'lib/fnordmetric/worker.rb', line 68

def publish_event(event)    
  @redis.publish(pubsub_key, event[:_eid])    
end

#pubsub_keyObject



44
45
46
# File 'lib/fnordmetric/worker.rb', line 44

def pubsub_key
  [@opts[:redis_prefix], 'announce'].join("-")
end

#queue_keyObject



48
49
50
# File 'lib/fnordmetric/worker.rb', line 48

def queue_key
  [@opts[:redis_prefix], 'queue'].join("-")
end

#ready!Object



9
10
11
12
# File 'lib/fnordmetric/worker.rb', line 9

def ready!
  @redis = EM::Hiredis.connect(@opts[:redis_uri]) 
  tick
end

#stats_keyObject



56
57
58
# File 'lib/fnordmetric/worker.rb', line 56

def stats_key
  [@opts[:redis_prefix], 'stats'].join("-")
end

#tickObject



21
22
23
24
25
26
27
28
29
30
# File 'lib/fnordmetric/worker.rb', line 21

def tick
  @redis.blpop(queue_key, 0).callback do |list, event_id|           
    @redis.get(event_key(event_id)).callback do |event_data|                     
      process_event(event_id, event_data) if event_data        
      FnordMetric.log("oops, lost an event :(") unless event_data
      EM.next_tick(&method(:tick))      
      @redis.hincrby(stats_key, :events_processed, 1)
    end
  end
end