Class: Interferon::Interferon
- Inherits:
-
Object
- Object
- Interferon::Interferon
- Includes:
- Logging
- Defined in:
- lib/interferon.rb
Constant Summary collapse
- DRY_RUN_ALERTS_NAME_PREFIX =
'[-dry-run-]'.freeze
Instance Attribute Summary collapse
-
#destinations ⇒ Object
Returns the value of attribute destinations.
-
#host_info ⇒ Object
Returns the value of attribute host_info.
-
#host_sources ⇒ Object
Returns the value of attribute host_sources.
Instance Method Summary collapse
- #build_alerts_queue(hosts, alerts, groups) ⇒ Object
- #create_alerts(dest, alerts_queue) ⇒ Object
-
#initialize(config, dry_run = false) ⇒ Interferon
constructor
groups_sources is a hash from type => options for each group source host_sources is a hash from type => options for each host source destinations is a similar hash from type => options for each alerter.
- #read_alerts ⇒ Object
- #read_groups(sources) ⇒ Object
- #read_hosts(sources) ⇒ Object
- #run ⇒ Object
- #run_update(dest, alerts_queue, existing_alerts) ⇒ Object
- #update_alerts(destinations, hosts, alerts, groups) ⇒ Object
- #update_alerts_on_destination(dest, alerts_queue) ⇒ Object
Methods included from Logging
configure_logger_for, #log, #statsd
Constructor Details
#initialize(config, dry_run = false) ⇒ Interferon
groups_sources is a hash from type => options for each group source host_sources is a hash from type => options for each host source destinations is a similar hash from type => options for each alerter
26 27 28 29 30 31 32 33 34 |
# File 'lib/interferon.rb', line 26 def initialize(config, dry_run = false) @alerts_repo_path = config['alerts_repo_path'] @group_sources = config['group_sources'] || {} @host_sources = config['host_sources'] @destinations = config['destinations'] @processes = config['processes'] @dry_run = dry_run @request_shutdown = false end |
Instance Attribute Details
#destinations ⇒ Object
Returns the value of attribute destinations.
19 20 21 |
# File 'lib/interferon.rb', line 19 def destinations @destinations end |
#host_info ⇒ Object
Returns the value of attribute host_info.
19 20 21 |
# File 'lib/interferon.rb', line 19 def host_info @host_info end |
#host_sources ⇒ Object
Returns the value of attribute host_sources.
19 20 21 |
# File 'lib/interferon.rb', line 19 def host_sources @host_sources end |
Instance Method Details
#build_alerts_queue(hosts, alerts, groups) ⇒ Object
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 |
# File 'lib/interferon.rb', line 241 def build_alerts_queue(hosts, alerts, groups) alerts_queue = {} all_alert_generation_errors = [] # create or update alerts; mark when we've done that result = Parallel.map(alerts, in_processes: @processes) do |alert| break if @request_shutdown alerts_generated = {} alert_generation_errors = [] counters = { errors: 0, evals: 0, applies: 0, hosts: hosts.length, } last_eval_error = nil hosts.each do |hostinfo| begin alert.evaluate(hostinfo) counters[:evals] += 1 rescue StandardError => e log.debug("Evaluation of alert #{alert} failed in the context of host #{hostinfo}") counters[:errors] += 1 last_eval_error = e next end # don't define an alert that doesn't apply to this hostinfo unless alert[:applies] log.debug("alert #{alert[:name]} doesn't apply to #{hostinfo.inspect}") next end counters[:applies] += 1 # don't define alerts twice next if alerts_generated.key?(alert[:name]) # figure out who to notify people = Set.new(alert[:notify][:people]) alert[:notify][:groups].each do |g| people += (groups[g] || []) end # queue the alert up for creation; we clone the alert to save the current state alerts_generated[alert[:name]] = [alert.clone, people] end # log some of the counters statsd.gauge('alerts.evaluate.errors', counters[:errors], tags: ["alert:#{alert}"]) statsd.gauge('alerts.evaluate.applies', counters[:applies], tags: ["alert:#{alert}"]) if counters[:applies] > 0 log.info("alert #{alert} applies to #{counters[:applies]} of #{counters[:hosts]} hosts") end # did the alert fail to evaluate on all hosts? if counters[:errors] == counters[:hosts] && !last_eval_error.nil? log.error("alert #{alert} failed to evaluate in the context of all hosts!") log.error("last error on alert #{alert}: #{last_eval_error}") statsd.gauge('alerts.evaluate.failed_on_all', 1, tags: ["alert:#{alert}"]) log.debug( "alert #{alert}: " \ "error #{last_eval_error}\n#{last_eval_error.backtrace.join("\n")}" ) alert_generation_errors << alert else statsd.gauge('alerts.evaluate.failed_on_all', 0, tags: ["alert:#{alert}"]) end # did the alert apply to any hosts? if counters[:applies] == 0 statsd.gauge('alerts.evaluate.never_applies', 1, tags: ["alert:#{alert}"]) log.warn("alert #{alert} did not apply to any hosts") alert_generation_errors << alert else statsd.gauge('alerts.evaluate.never_applies', 0, tags: ["alert:#{alert}"]) end [alerts_generated, alert_generation_errors] end result.each do |generated_alerts, alert_generation_errors| alerts_queue.merge!(generated_alerts) all_alert_generation_errors += alert_generation_errors end [alerts_queue, all_alert_generation_errors] end |
#create_alerts(dest, alerts_queue) ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/interferon.rb', line 218 def create_alerts(dest, alerts_queue) alert_key_ids = [] alerts_to_create = alerts_queue.keys concurrency = dest.concurrency || 10 unless @request_shutdown threads = Array.new(concurrency) do |i| log.info("thread #{i} created") t = Thread.new do while (name = alerts_to_create.shift) break if @request_shutdown cur_alert, people = alerts_queue[name] log.debug("creating alert for #{cur_alert[:name]}") alert_key_ids << dest.create_alert(cur_alert, people) end end t.abort_on_exception = true t end threads.map(&:join) end alert_key_ids end |
#read_alerts ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/interferon.rb', line 62 def read_alerts alerts = [] failed = 0 # validate that alerts path exists path = File.(File.join(@alerts_repo_path, 'alerts')) abort("no such directory #{path} for reading alert files") \ unless Dir.exist?(path) Dir.glob(File.join(path, '*.rb')) do |alert_file| break if @request_shutdown begin alert = Alert.new(alert_file) rescue StandardError => e log.warn("error reading alert file #{alert_file}: #{e}") failed += 1 else alerts << alert end end log.info("read #{alerts.count} alerts files from #{path}") statsd.gauge('alerts.read.count', alerts.count) statsd.gauge('alerts.read.failed', failed) abort("failed to read #{failed} alerts") if failed > 0 alerts end |
#read_groups(sources) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/interferon.rb', line 92 def read_groups(sources) groups = {} loader = GroupSourcesLoader.new([@alerts_repo_path]) loader.get_all(sources).each do |source| break if @request_shutdown source_groups = source.list_groups { groups } # add all people to groups people_count = 0 source_groups.each do |name, people| groups[name] ||= [] groups[name].concat(people) people_count += people.count end log.info( "read #{people_count} people in #{source_groups.count} groups " \ "from source #{source.class.name}" ) end log.info( "total of #{groups.values.flatten.count} people in #{groups.count} groups " \ "from #{sources.count} sources" ) statsd.gauge('groups.sources', sources.count) statsd.gauge('groups.count', groups.count) statsd.gauge('groups.people', groups.values.flatten.count) groups end |
#read_hosts(sources) ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/interferon.rb', line 125 def read_hosts(sources) statsd.gauge('hosts.sources', sources.count) hosts = [] loader = HostSourcesLoader.new([@alerts_repo_path]) loader.get_all(sources).each do |source| break if @request_shutdown source_hosts = source.list_hosts hosts << source_hosts statsd.gauge('hosts.count', source_hosts.count, tags: ["source:#{source.class.name}"]) log.info("read #{source_hosts.count} hosts from source #{source.class.name}") end hosts.flatten! log.info("total of #{hosts.count} entities from #{sources.count} sources") hosts end |
#run ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/interferon.rb', line 36 def run Signal.trap('TERM') do log.info('SIGTERM received. shutting down gracefully...') @request_shutdown = true end run_desc = @dry_run ? 'dry run' : 'run' log.info("beginning alerts #{run_desc}") alerts = read_alerts groups = read_groups(@group_sources) hosts = read_hosts(@host_sources) @destinations.each do |dest| dest['options'] ||= {} dest['options']['dry_run'] = true if @dry_run end update_alerts(@destinations, hosts, alerts, groups) if @request_shutdown log.info("interferon #{run_desc} shut down by SIGTERM") else log.info("interferon #{run_desc} complete") end end |
#run_update(dest, alerts_queue, existing_alerts) ⇒ Object
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/interferon.rb', line 185 def run_update(dest, alerts_queue, existing_alerts) updates_queue = alerts_queue.reject do |_name, alert_people_pair| !dest.need_update(alert_people_pair, existing_alerts) end # Create alerts in destination create_alerts(dest, updates_queue) # Do not continue to remove alerts during dry-run return if @dry_run # Existing alerts are pruned until all that remains are # alerts that aren't being generated anymore to_remove = existing_alerts.dup alerts_queue.each do |_name, alert_people_pair| alert, _people = alert_people_pair old_alerts = to_remove[alert['name']] next if old_alerts.nil? if old_alerts['id'].length == 1 to_remove.delete(alert['name']) else old_alerts['id'] = old_alerts['id'].drop(1) end end # Clean up alerts not longer being generated to_remove.each do |_name, alert| break if @request_shutdown dest.remove_alert(alert) end end |
#update_alerts(destinations, hosts, alerts, groups) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/interferon.rb', line 145 def update_alerts(destinations, hosts, alerts, groups) alerts_queue, alert_errors = build_alerts_queue(hosts, alerts, groups) if @dry_run && !alert_errors.empty? raise "Alerts failed to apply or evaluate for all hosts: #{alerts.map(&:to_s).join(', ')}" end loader = DestinationsLoader.new([@alerts_repo_path]) loader.get_all(destinations).each do |dest| break if @request_shutdown log.info("updating alerts on #{dest.class.name}") update_alerts_on_destination(dest, alerts_queue) end end |
#update_alerts_on_destination(dest, alerts_queue) ⇒ Object
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/interferon.rb', line 159 def update_alerts_on_destination(dest, alerts_queue) # track some counters/stats per destination start_time = Time.new.to_f # get already-defined alerts existing_alerts = dest.existing_alerts run_update(dest, alerts_queue, existing_alerts) unless @request_shutdown # run time summary run_time = Time.new.to_f - start_time statsd.histogram( @dry_run ? 'destinations.run_time.dry_run' : 'destinations.run_time', run_time, tags: ["destination:#{dest.class.name}"] ) log.info("#{dest.class.name} : run completed in %.2f seconds" % run_time) # report destination stats dest.report_stats end raise dest.api_errors.to_s if @dry_run && !dest.api_errors.empty? end |