Class: FnordMetric::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/fnordmetric/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Worker

Returns a new instance of Worker.



3
4
5
6
7
8
# File 'lib/fnordmetric/worker.rb', line 3

def initialize(opts = {})
  @namespaces = FnordMetric.namespaces
  @opts = FnordMetric.options(opts)

  FnordMetric.register(self)
end

Instance Method Details

#announce_event(event) ⇒ Object



57
58
59
# File 'lib/fnordmetric/worker.rb', line 57

def announce_event(event)
  namespace(event[:_namespace]).ready!(redis, sync_redis).announce(event)
end

#event_key(event_id) ⇒ Object



49
50
51
# File 'lib/fnordmetric/worker.rb', line 49

def event_key(event_id)
  [@opts[:redis_prefix], 'event', event_id].join("-")
end

#expire_event(event_id) ⇒ Object



61
62
63
# File 'lib/fnordmetric/worker.rb', line 61

def expire_event(event_id)
  redis.expire(event_key(event_id), @opts[:event_data_ttl])
end

#initializedObject



10
11
12
13
# File 'lib/fnordmetric/worker.rb', line 10

def initialized
  FnordMetric.log("worker started")
  EM.next_tick(&method(:tick))
end

#namespace(key) ⇒ Object



69
70
71
# File 'lib/fnordmetric/worker.rb', line 69

def namespace(key)
  (@namespaces[key] || @namespaces.first.last).clone
end

#parse_json(data) ⇒ Object



73
74
75
76
77
78
79
# File 'lib/fnordmetric/worker.rb', line 73

def parse_json(data)
  event = Yajl::Parser.new(:symbolize_keys => true).parse(data)
  event[:_namespace] = event[:_namespace].to_sym if event[:_namespace]
  event
rescue Yajl::ParseError => e
  FnordMetric.error "invalid json: #{e.to_s}"; false
end

#process_event(event_id, event_data) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/fnordmetric/worker.rb', line 28

def process_event(event_id, event_data)
  EM.next_tick do
    event = parse_json(event_data)
    if event
      event[:_time] ||= Time.now.to_i
      event[:_eid] = event_id
      announce_event(event)
      publish_event(event)
      expire_event(event_id)
    end
  end
end

#publish_event(event) ⇒ Object



65
66
67
# File 'lib/fnordmetric/worker.rb', line 65

def publish_event(event)    
  redis.publish(pubsub_key, event.to_json)
end

#pubsub_keyObject



41
42
43
# File 'lib/fnordmetric/worker.rb', line 41

def pubsub_key
  [@opts[:redis_prefix], 'announce'].join("-")
end

#queue_keyObject



45
46
47
# File 'lib/fnordmetric/worker.rb', line 45

def queue_key
  [@opts[:redis_prefix], 'queue'].join("-")
end

#redisObject



81
82
83
# File 'lib/fnordmetric/worker.rb', line 81

def redis
  @redis ||= EM::Hiredis.connect(FnordMetric.options[:redis_url]) # FIXPAUL
end

#stats_keyObject



53
54
55
# File 'lib/fnordmetric/worker.rb', line 53

def stats_key
  [@opts[:redis_prefix], 'stats'].join("-")
end

#sync_redisObject



85
86
87
# File 'lib/fnordmetric/worker.rb', line 85

def sync_redis
  @sync_redis ||= FnordMetric.mk_redis
end

#tickObject



15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/fnordmetric/worker.rb', line 15

def tick
  redis.blpop(queue_key, 1).callback do |list, event_id|
    EM.next_tick(&method(:tick))
    if event_id
      redis.get(event_key(event_id)).callback do |event_data|
        process_event(event_id, event_data) if event_data
        FnordMetric.log("event_lost: event_data not found for event-id '#{event_id}' - maybe expired?") unless event_data
        redis.hincrby(stats_key, :events_processed, 1)
      end
    end
  end
end