Class: Anschel::Filter
- Inherits:
-
Object
- Object
- Anschel::Filter
- Defined in:
- lib/anschel/filter.rb,
lib/anschel/filter/tag.rb,
lib/anschel/filter/gsub.rb,
lib/anschel/filter/scan.rb,
lib/anschel/filter/debug.rb,
lib/anschel/filter/index.rb,
lib/anschel/filter/parse.rb,
lib/anschel/filter/stamp.rb,
lib/anschel/filter/convert.rb
Instance Attribute Summary collapse
-
#filters ⇒ Object
readonly
Returns the value of attribute filters.
Instance Method Summary collapse
- #apply(event) ⇒ Object
- #convert(conf, stats, log) ⇒ Object
- #debug(conf, stats, log) ⇒ Object
- #gsub(conf, stats, log) ⇒ Object
- #index(conf, stats, log) ⇒ Object
-
#initialize(config, stats, log) ⇒ Filter
constructor
A new instance of Filter.
- #parse(conf, stats, log) ⇒ Object
- #scan(conf, stats, log) ⇒ Object
- #stamp(conf, stats, log) ⇒ Object
- #tag(conf, stats, log) ⇒ Object
Constructor Details
#initialize(config, stats, log) ⇒ Filter
Returns a new instance of Filter.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/anschel/filter.rb', line 16 def initialize config, stats, log log.info event: 'filter-loading' log.info event: 'filter-config', config: config config ||= {} # Allow for nil config @filters = Hash.new { |h,k| h[k] = [] } config.each do |event_type, filter_defns| filter_defns.each do |filter_defn| filter_type = filter_defn.keys.first filter_conf = filter_defn[filter_type] @filters[event_type] << self.send(filter_type, filter_conf, stats, log) end end log.info event: 'filter-fully-loaded' end |
Instance Attribute Details
#filters ⇒ Object (readonly)
Returns the value of attribute filters.
14 15 16 |
# File 'lib/anschel/filter.rb', line 14 def filters @filters end |
Instance Method Details
#apply(event) ⇒ Object
32 33 34 35 36 37 38 39 |
# File 'lib/anschel/filter.rb', line 32 def apply event raise 'Event does not have a "type" field' unless event[:type] type = event[:type].to_sym # In case of modification filters[:_before].each { |f| f.call event } filters[type].each { |f| f.call event } filters[:_after].each { |f| f.call event } event end |
#convert(conf, stats, log) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/anschel/filter/convert.rb', line 9 def convert conf, stats, log field = conf.delete :field type = conf.delete :type raise 'Missing required "field" for "convert" filter' if field.nil? raise 'Missing required "type" for "convert" filter' if type.nil? field = field.to_sym type_conversions = { 'integer' => :to_i, 'float' => :to_f, 'string' => :to_s } stats.create 'filter-convert' stats.create 'filter-convert-skipped' log.info event: 'filter-compiled', kind: 'convert', \ field: field, type: type lambda do |event| unless event.has_key? field stats.inc 'filter-convert-skipped' return event end event[field] = event[field].send type_conversions[type] stats.inc 'filter-convert' filtered event, conf end end |
#debug(conf, stats, log) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/anschel/filter/debug.rb', line 6 def debug conf, stats, log log.info event: 'filter-compiled', kind: 'debug' lambda do |event| log.debug \ event: 'debug', event_repr: event.inspect, raw_event: event event end end |
#gsub(conf, stats, log) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/anschel/filter/gsub.rb', line 10 def gsub conf, stats, log field = conf.delete :field match = conf.delete :match replace = conf.delete :replace raise 'Missing required "field" for "gsub" filter' if field.nil? raise 'Missing required "match" for "gsub" filter' if match.nil? raise 'Missing required "replace" for "gsub" filter' if replace.nil? field = field.to_sym match = match.is_a?(Array) ? match : [ match ] match = match.map { |m| Regexp.new m } stats.create 'filter-gsub' stats.create 'filter-gsub-skipped' log.info event: 'filter-compiled', kind: 'gsub', \ field: field, match: match, replace: replace lambda do |event| unless event[field] stats.inc 'filter-gsub-skipped' return event end match.each do |m| event[field].gsub! m, replace end stats.inc 'filter-gsub' filtered event, conf end end |
#index(conf, stats, log) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/anschel/filter/index.rb', line 11 def index conf, stats, log stamp = conf.delete(:stamp) || '@timestamp' prefix = conf.delete(:prefix) || 'logs-%{type}-' suffix = conf.delete(:suffix) || '%Y.%m.%d' format = conf.delete(:format) || %w[ yyyy-MM-dd'T'HH:mm:ss.SSSZZ yyyy-MM-dd'T'HH:mm:ss.SSSZ yyyy-MM-dd'T'HH:mm:ssZZ yyyy-MM-dd'T'HH:mm:ssZ ] # ISO8601 routing = conf.delete(:routing) error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'index-error' stamp = stamp.to_sym format = [ format ] unless format.is_a? Array joda = format.map do |f| j = org.joda.time.format.DateTimeFormat.forPattern f j.withDefaultYear(Time.new.year).withOffsetParsed end stats.create 'filter-index' stats.create 'filter-index-skipped' stats.create 'filter-index-error' log.info event: 'filter-compiled', kind: 'index', \ stamp: stamp, prefix: prefix, suffix: suffix, format: format, routing: routing lambda do |event| idx_prefix = prefix % event event[:_routing] = routing % event if routing stamped = event.has_key? stamp matched = false joda.each do |j| begin millis = j.parseMillis event[stamp] idx_suffix = Time.at(0.001 * millis).utc.strftime(suffix) event[:_index] = idx_prefix + idx_suffix stats.inc 'filter-index' matched = true rescue java.lang.IllegalArgumentException => e end break if matched end if stamped return filtered(event, conf) if matched = Time.now.utc event[stamp] = .iso8601(3) unless stamped event[:_index] = idx_prefix + .strftime(suffix) log.warn \ event: 'filter-index-warning', reason: 'could not parse event', remediation: 'added bogus index', remediation: "sending to best-guess index '#{event[:_index]}'", stamp: stamp, prefix: prefix, suffix: suffix, format: format, raw_event: event if error_tag event[:tags] ||= [] event[:tags] << error_tag end stats.inc 'filter-index-error' stats.inc 'filter-index' filtered event, conf end end |
#parse(conf, stats, log) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/anschel/filter/parse.rb', line 9 def parse conf, stats, log field = conf.delete :field pattern = Regexp.new conf.delete(:pattern) unless_field = conf.delete(:unless_field) || '@timestamp' error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'parse-error' raise 'Missing required "field" for "parse" filter' if field.nil? raise 'Missing required "pattern" for "parse" filter' if pattern.nil? field = field.to_sym unless_field = unless_field.to_sym stats.create 'filter-parse' stats.create 'filter-parse-skipped' stats.create 'filter-parse-error' log.info event: 'filter-compiled', kind: 'parse', \ field: field, pattern: pattern lambda do |event| unless event[field] stats.inc 'filter-parse-skipped' return event end if event[unless_field] stats.inc 'filter-parse-skipped' return event end mdata = pattern.match event[field] if mdata.nil? log.warn \ event: 'parse-filter-error', reason: 'regexp did not match', field: field, pattern: pattern, raw_event: event stats.inc 'filter-parse-error' if error_tag event[:tags] ||= [] event[:tags] << error_tag end return event end mdata.names.each do |group| event[group.to_sym] = mdata[group] end stats.inc 'filter-parse' filtered event, conf end end |
#scan(conf, stats, log) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/anschel/filter/scan.rb', line 10 def scan conf, stats, log field = conf.delete :field pattern = conf.delete :pattern target = conf.delete :target error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'scan-error' raise 'Missing required "field" for "scan" filter' if field.nil? raise 'Missing required "pattern" for "scan" filter' if pattern.nil? raise 'Missing required "target" for "convert" filter' if target.nil? field = field.to_sym target = target.to_sym match = Regexp.new pattern stats.create 'filter-scan' stats.create 'filter-scan-skipped' stats.create 'filter-scan-nomatch' stats.create 'filter-scan-error' log.info event: 'filter-compiled', kind: 'scan', \ field: field, pattern: pattern, match: match, target: target lambda do |event| unless event[field] stats.inc 'filter-scan-skipped' return event end results = event[field].scan(match).flatten.uniq.map do |s| s.reverse.reverse # N.B. There seems to be some issue with the "scan" # function in JRuby wherein the matches are # shared across threads or somehow mangled. # The reverse.reverse here ensures that we # create a new object with the original # contents still intact. If you have a # better solution, please contact me! end if results.empty? stats.inc 'filter-scan-nomatch' event else event[target] ||= [] event[target] += results stats.inc 'filter-scan' filtered event, conf end end end |
#stamp(conf, stats, log) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/anschel/filter/stamp.rb', line 14 def stamp conf, stats, log utc = conf.delete :utc? field = conf.delete :field pattern = conf.delete :pattern target = conf.delete :target precision = conf.delete(:precision) || 3 error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'stamp-error' raise 'Missing required "field" for "stamp" filter' if field.nil? raise 'Missing required "pattern" for "stamp" filter' if pattern.nil? patterns = pattern.is_a?(Array) ? pattern : [ pattern ] target ||= '@timestamp' field = field.to_sym target = target.to_sym parsers = patterns.map do |p| joda = org.joda.time.format.DateTimeFormat.forPattern p joda = joda.withDefaultYear Time.new.year joda = joda.withOffsetParsed end offset_s = utc ? Time.zone_offset(Time.now.zone).to_f : 0.0 stats.create 'filter-stamp' stats.create 'filter-stamp-skipped' stats.create 'filter-stamp-error' log.info event: 'filter-compiled', kind: 'stamp', \ utc?: utc, field: field, pattern: pattern, target: target lambda do |event| unless event[field] stats.inc 'filter-stamp-skipped' return event end if event[target] log.warn \ event: 'stamp-filter-warning', reason: 'event already has target field', utc?: utc, field: field, pattern: pattern, target: target, raw_event: event event[target] = \ DateTime.parse(event[target]).to_time.utc.iso8601(precision) return event end event_field = event[field].dup matched = false parsers.each do |joda| begin millis = joda.parseMillis event[field] event[target] = Time.at(0.001 * millis + offset_s).utc.iso8601(precision) stats.inc 'filter-stamp' matched = true rescue end break if matched end return filtered(event, conf) if matched log.warn \ event: 'stamp-filter-warning', reason: 'could not parse event', remediation: 'using current time for stamp', utc?: utc, field: field, pattern: pattern, target: target, raw_event: event if error_tag event[:tags] ||= [] event[:tags] << error_tag end event[target] = Time.now.utc.iso8601(precision) stats.inc 'filter-stamp-error' filtered event, conf end end |
#tag(conf, stats, log) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/anschel/filter/tag.rb', line 8 def tag conf, stats, log = conf.delete :with raise 'Missing required "with" for "tag" filter' if .nil? = .is_a?(Array) ? : [ ] stats.create 'filter-tag' log.info event: 'filter-compiled', kind: 'tag', with: lambda do |event| event[:tags] ||= [] event[:tags] += event[:tags].uniq! stats.inc 'filter-tag' filtered event, conf end end |