Class: REC::Correlator
Overview
The Correlator reads in log entries, matching them against the ruleset, creates states as necessary, generates new (correlated) events, and sends alerts.
Constant Summary collapse
- @@now =
The current time from the perspective of the latest event processed, not necessarily actual clock time.
Time.now()
- @@eventsIn =
0
- @@eventsMissed =
0
Class Method Summary collapse
-
.now ⇒ Object
Makes the current processing time effectively global.
-
.start(opts = {}) ⇒ Object
Convenience method to create and start a correlator.
Instance Method Summary collapse
-
#finish ⇒ Object
Stop correlating, close IO streams and exit.
-
#initialize ⇒ Correlator
constructor
Create a new Correlator.
-
#parse(logLine) ⇒ Object
Parses a log entry into a timestamp and a message.
-
#run ⇒ Object
reads the next input log entry, parses it into a timestamp and a message, and checks the message against the ruleset.
-
#start ⇒ Object
Start a Correlator.
-
#stats ⇒ Object
Reports statistics to stderr.
Constructor Details
#initialize ⇒ Correlator
Create a new Correlator
28 29 30 31 32 |
# File 'lib/rec/correlator.rb', line 28 def initialize() @startupTime = Time.now() @year = @startupTime.year @running = false end |
Class Method Details
.now ⇒ Object
Makes the current processing time effectively global
23 24 25 |
# File 'lib/rec/correlator.rb', line 23 def self.now() @@now end |
.start(opts = {}) ⇒ Object
Convenience method to create and start a correlator. Possible options are:
-
:debug => true
17 18 19 20 |
# File 'lib/rec/correlator.rb', line 17 def self.start(opts={}) $debug = opts[:debug] || false self.new().start() end |
Instance Method Details
#finish ⇒ Object
Stop correlating, close IO streams and exit.
91 92 93 94 95 96 97 98 |
# File 'lib/rec/correlator.rb', line 91 def finish() @running = false $miss.close() unless $miss.nil? or $miss.closed? # NOTE: some states may have something useful to say, or maybe we could store them stats() $stderr.puts("rec is finished.") exit 0 end |
#parse(logLine) ⇒ Object
Parses a log entry into a timestamp and a message. Handles formats like: Apr 22 16:40:18 aqua Firewall: …
- err
-
Fri Dec 30 23:58:56 2011 - scan error: …
2012-04-22 08:43:22.099 EST - Module: … otherwise time stands still.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/rec/correlator.rb', line 105 def parse(logLine) if logLine =~ /^(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+(\d+)\s(\d\d)\:(\d\d)\:(\d\d)\s+/ # Apr 22 16:40:18 aqua Firewall[205]: Skype is listening from 0.0.0.0:51304 proto=6 time = Time.local(@year, $1, $2, $3, $4, $5, 0) = $' elsif logLine =~ /^(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+(\d+),\s+(\d{4})\s(\d+)\:(\d\d)\:(\d\d)\s+(AM|PM)\s+/ # Oct 1, 2012 1:42:04 PM org.apache.catalina.startup.Catalina start hour = ($7 == "PM") ? (($4.to_i % 12) + 12) : $4 # convert PM to 24-hour style time = Time.local($3, $1, $2, hour, $5, $6, 0) = $' elsif logLine =~ /^\[\w+\]\s(Sun|Mon|Tue|Wed|Thu|Fri|Sat)\s(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+(\d+)\s(\d\d)\:(\d\d)\:(\d\d)\s(\d{4})/ # [err] Fri Dec 30 23:58:56 2011 - scan error: 451 SCAN Engine error 2 ... time = Time.local($7, $2, $3, $4, $5, $6, 0) = $' elsif logLine =~ /^(\d{4})\-(\d\d)\-(\d\d)[\sT](\d\d)\:(\d\d)\:(\d\d)(\S*)\s+/ # 2012-04-22 08:43:22.099 EST - Module: PlistFile ... # 2012-10-01T11:45:44+10:00 proc=sendmail status=running if not $7.empty? and ($7.include?("UTC") or $7.include?("GMT")) time = Time.utc($1, $2, $3, $4, $5, $6) else time = Time.local($1, $2, $3, $4, $5, $6) end = $' else time = Correlator.now().freeze() # time stands still = logLine end [time, ] end |
#run ⇒ Object
reads the next input log entry, parses it into a timestamp and a message, and checks the message against the ruleset. Continuously loops until the log input stream is closed, or interrupted by a signal
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/rec/correlator.rb', line 55 def run() while @running and !$stdin.closed? do logLine = gets() if logLine.nil? # wait nicely for more input sleep 1 next end logLine.strip!() next if logLine.empty? @@eventsIn += 1 @@now, = parse(logLine) $stderr.puts("< "+) if $debug State.expire_states() # remove expired states before we check the rules eventMatched = false Rule.each { |rule| matchok, title = rule.check() eventMatched ||= matchok next if title.nil? break if title.empty? # match without a message means 'swallow this event' state = State[title] || rule.create_state(title) rule.react(state, logLine) $stderr.puts("breaking after rule #{rule.rid}") unless (!$debug or rule.continue()) break unless rule.continue() } if !eventMatched @@eventsMissed += 1 $miss.puts("* "+logLine) unless $miss.nil? end $miss.flush() $stdout.flush() $stderr.flush() end finish() if $stdin.eof? end |
#start ⇒ Object
Start a Correlator. INT
and TERM
signals will stop the Correlator. USR1
signal will cause Correlator to display statistics and continue running.
Missed events are written to File Descriptor 3, in case they are of interest (typically while testing rulesets)
39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/rec/correlator.rb', line 39 def start() Signal.trap("INT") { finish() } Signal.trap("TERM") { finish() } Signal.trap("USR1") { stats() run() } $stderr.puts("rec is starting...") $miss ||= IO.open(3, "a") # write out to &3 for missed events, unless $miss is pre-defined @running = true run() end |
#stats ⇒ Object
Reports statistics to stderr
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/rec/correlator.rb', line 136 def stats() checked, matched, created, reacted, rules = Rule.stats() statesCount, eventsOut = State.stats() $stderr.puts("-"*40) $stderr.puts("srec has been running for %.1fs" % [Time.now - @startupTime]) $stderr.puts("events: in %-8d out %-8d missed %-8d" % [@@eventsIn, eventsOut, @@eventsMissed]) $stderr.puts("states: active %-8d created %-8d " % [statesCount, created.values.reduce(:+)]) $stderr.puts("rules: checked %-8d matched %-8d reacted %-8d" % [checked.values.reduce(:+),matched.values.reduce(:+), reacted.values.reduce(:+)]) $stderr.puts("Rule ID checked matched freq % reacted ") #checked.keys.sort { |a,b| matched[b] <=> matched[a] }.each {|rid| # descending by matches rules.collect { |rule| rule.rid }.each { |rid| if checked[rid] > 0 freq = "%5.2f" % [matched[rid].to_f() / checked[rid].to_f() * 100] else freq = "never" end $stderr.puts("%-8d %-8d %-8d %-8s %-8d" % [rid, checked[rid], matched[rid], freq, reacted[rid]]) } $stderr.flush() end |