Class: Bosh::Monitor::EventProcessor
- Inherits:
-
Object
- Object
- Bosh::Monitor::EventProcessor
- Defined in:
- lib/bosh/monitor/event_processor.rb
Instance Attribute Summary collapse
-
#plugins ⇒ Object
readonly
Returns the value of attribute plugins.
Instance Method Summary collapse
- #add_plugin(plugin, event_kinds = []) ⇒ Object
- #enable_pruning(interval) ⇒ Object
- #events_count ⇒ Object
-
#initialize ⇒ EventProcessor
constructor
A new instance of EventProcessor.
- #process(kind, data) ⇒ Object
- #prune_events(lifetime) ⇒ Object
Constructor Details
#initialize ⇒ EventProcessor
Returns a new instance of EventProcessor.
5 6 7 8 9 10 11 |
# File 'lib/bosh/monitor/event_processor.rb', line 5 def initialize @events = {} @plugins = {} @lock = Mutex.new @logger = Bhm.logger end |
Instance Attribute Details
#plugins ⇒ Object (readonly)
Returns the value of attribute plugins.
3 4 5 |
# File 'lib/bosh/monitor/event_processor.rb', line 3 def plugins @plugins end |
Instance Method Details
#add_plugin(plugin, event_kinds = []) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/bosh/monitor/event_processor.rb', line 13 def add_plugin(plugin, event_kinds = []) if plugin.respond_to?(:validate_options) && !plugin. raise FatalError, "Invalid plugin options for '#{plugin.class}'" end @lock.synchronize do event_kinds.each do |kind| kind = kind.to_sym @plugins[kind] ||= Set.new @plugins[kind] << plugin end plugin.run end end |
#enable_pruning(interval) ⇒ Object
65 66 67 68 69 70 71 72 73 74 |
# File 'lib/bosh/monitor/event_processor.rb', line 65 def enable_pruning(interval) @reaper ||= Thread.new do loop do # Some events might be in the system up to 2 * interval # seconds this way, but it seems to be a reasonable trade-off prune_events(interval) sleep(interval) end end end |
#events_count ⇒ Object
56 57 58 59 60 61 62 63 |
# File 'lib/bosh/monitor/event_processor.rb', line 56 def events_count # Accumulate event counter over all event kinds @lock.synchronize do @events.inject(0) do |counter, (kind, events)| counter += events.size end end end |
#process(kind, data) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/bosh/monitor/event_processor.rb', line 28 def process(kind, data) kind = kind.to_sym event = Bhm::Events::Base.create!(kind, data) @lock.synchronize do @events[kind] ||= {} if @events[kind].has_key?(event.id) @logger.debug("Ignoring duplicate #{event.kind} '#{event.id}'") return true end # We don't really need to store event itself for the moment, # as we only use its id to dedup new events. @events[kind][event.id] = { :received_at => Time.now.to_i } end if @plugins[kind].nil? || @plugins[kind].empty? @logger.debug("No plugins are interested in '#{event.kind}' event") return true end @plugins[kind].each do |plugin| plugin_process(plugin, event) end true end |
#prune_events(lifetime) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/bosh/monitor/event_processor.rb', line 76 def prune_events(lifetime) @lock.synchronize do pruned_count = 0 total_count = 0 @events.each_value do |list| list.delete_if do |id, data| total_count += 1 if data[:received_at] <= Time.now.to_i - lifetime pruned_count += 1 true else false end end end @logger.debug("Pruned %s" % [ pluralize(pruned_count, "old event") ]) @logger.debug("Total %s" % [ pluralize(total_count, "event") ]) end rescue => e @logger.error("Error pruning events: #{e}") @logger.error(e.backtrace.join("\n")) end |