Class: Bosh::Monitor::EventProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/bosh/monitor/event_processor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeEventProcessor

Returns a new instance of EventProcessor.



5
6
7
8
9
10
11
# File 'lib/bosh/monitor/event_processor.rb', line 5

def initialize
  @events = {}
  @plugins = {}

  @lock   = Mutex.new
  @logger = Bhm.logger
end

Instance Attribute Details

#pluginsObject (readonly)

Returns the value of attribute plugins.



3
4
5
# File 'lib/bosh/monitor/event_processor.rb', line 3

def plugins
  @plugins
end

Instance Method Details

#add_plugin(plugin, event_kinds = []) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/bosh/monitor/event_processor.rb', line 13

def add_plugin(plugin, event_kinds = [])
  if plugin.respond_to?(:validate_options) && !plugin.validate_options
    raise FatalError, "Invalid plugin options for '#{plugin.class}'"
  end

  @lock.synchronize do
    event_kinds.each do |kind|
      kind = kind.to_sym
      @plugins[kind] ||= Set.new
      @plugins[kind] << plugin
    end
    plugin.run
  end
end

#enable_pruning(interval) ⇒ Object



65
66
67
68
69
70
71
72
73
74
# File 'lib/bosh/monitor/event_processor.rb', line 65

def enable_pruning(interval)
  @reaper ||= Thread.new do
    loop do
      # Some events might be in the system up to 2 * interval
      # seconds this way, but it seems to be a reasonable trade-off
      prune_events(interval)
      sleep(interval)
    end
  end
end

#events_countObject



56
57
58
59
60
61
62
63
# File 'lib/bosh/monitor/event_processor.rb', line 56

def events_count
  # Accumulate event counter over all event kinds
  @lock.synchronize do
    @events.inject(0) do |counter, (kind, events)|
      counter += events.size
    end
  end
end

#process(kind, data) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/bosh/monitor/event_processor.rb', line 28

def process(kind, data)
  kind = kind.to_sym
  event = Bhm::Events::Base.create!(kind, data)

  @lock.synchronize do
    @events[kind] ||= {}

    if @events[kind].has_key?(event.id)
      @logger.debug("Ignoring duplicate #{event.kind} '#{event.id}'")
      return true
    end
    # We don't really need to store event itself for the moment,
    # as we only use its id to dedup new events.
    @events[kind][event.id] = { :received_at => Time.now.to_i }
  end

  if @plugins[kind].nil? || @plugins[kind].empty?
    @logger.debug("No plugins are interested in '#{event.kind}' event")
    return true
  end

  @plugins[kind].each do |plugin|
    plugin_process(plugin, event)
  end

  true
end

#prune_events(lifetime) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/bosh/monitor/event_processor.rb', line 76

def prune_events(lifetime)
  @lock.synchronize do
    pruned_count = 0
    total_count = 0

    @events.each_value do |list|
      list.delete_if do |id, data|
        total_count += 1
        if data[:received_at] <= Time.now.to_i - lifetime
          pruned_count += 1
          true
        else
          false
        end
      end
    end

    @logger.debug("Pruned %s" % [ pluralize(pruned_count, "old event") ])
    @logger.debug("Total %s" % [ pluralize(total_count, "event") ])
  end
rescue => e
  @logger.error("Error pruning events: #{e}")
  @logger.error(e.backtrace.join("\n"))
end