Class: Anschel::Filter

Inherits:
Object
  • Object
show all
Defined in:
lib/anschel/filter.rb,
lib/anschel/filter/tag.rb,
lib/anschel/filter/gsub.rb,
lib/anschel/filter/scan.rb,
lib/anschel/filter/debug.rb,
lib/anschel/filter/index.rb,
lib/anschel/filter/parse.rb,
lib/anschel/filter/stamp.rb,
lib/anschel/filter/convert.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, stats, log) ⇒ Filter

Returns a new instance of Filter.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/anschel/filter.rb', line 16

def initialize config, stats, log
  log.info event: 'filter-loading'
  log.info event: 'filter-config', config: config
  config ||= {} # Allow for nil config
  @filters = Hash.new { |h,k| h[k] = [] }
  config.each do |event_type, filter_defns|
    filter_defns.each do |filter_defn|
      filter_type = filter_defn.keys.first
      filter_conf = filter_defn[filter_type]
      @filters[event_type] << self.send(filter_type, filter_conf, stats, log)
    end
  end
  log.info event: 'filter-fully-loaded'
end

Instance Attribute Details

#filtersObject (readonly)

Returns the value of attribute filters.



14
15
16
# File 'lib/anschel/filter.rb', line 14

def filters
  @filters
end

Instance Method Details

#apply(event) ⇒ Object



32
33
34
35
36
37
38
39
# File 'lib/anschel/filter.rb', line 32

def apply event
  raise 'Event does not have a "type" field' unless event[:type]
  type = event[:type].to_sym # In case of modification
  filters[:_before].each { |f| f.call event }
  filters[type].each     { |f| f.call event }
  filters[:_after].each  { |f| f.call event }
  event
end

#convert(conf, stats, log) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/anschel/filter/convert.rb', line 9

def convert conf, stats, log
  field = conf.delete :field
  type  = conf.delete :type

  raise 'Missing required "field" for "convert" filter' if field.nil?
  raise 'Missing required "type" for "convert" filter' if type.nil?

  field = field.to_sym

  type_conversions = {
    'integer' => :to_i,
    'float'   => :to_f,
    'string'  => :to_s
  }

  stats.create 'filter-convert'
  stats.create 'filter-convert-skipped'

  log.info event: 'filter-compiled', kind: 'convert', \
    field: field, type: type



  lambda do |event|
    unless event.has_key? field
      stats.inc 'filter-convert-skipped'
      return event
    end

    event[field] = event[field].send type_conversions[type]
    stats.inc 'filter-convert'
    filtered event, conf
  end

end

#debug(conf, stats, log) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/anschel/filter/debug.rb', line 6

def debug conf, stats, log
  log.info event: 'filter-compiled', kind: 'debug'



  lambda do |event|
    log.debug \
      event: 'debug',
      event_repr: event.inspect,
      raw_event: event
    event
  end

end

#gsub(conf, stats, log) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/anschel/filter/gsub.rb', line 10

def gsub conf, stats, log
  field   = conf.delete :field
  match   = conf.delete :match
  replace = conf.delete :replace

  raise 'Missing required "field" for "gsub" filter' if field.nil?
  raise 'Missing required "match" for "gsub" filter' if match.nil?
  raise 'Missing required "replace" for "gsub" filter' if replace.nil?

  field = field.to_sym

  match = match.is_a?(Array) ? match : [ match ]
  match = match.map { |m| Regexp.new m }

  stats.create 'filter-gsub'
  stats.create 'filter-gsub-skipped'

  log.info event: 'filter-compiled', kind: 'gsub', \
    field: field, match: match, replace: replace



  lambda do |event|
    unless event[field]
      stats.inc 'filter-gsub-skipped'
      return event
    end

    match.each do |m|
      event[field].gsub! m, replace
    end
    stats.inc 'filter-gsub'
    filtered event, conf
  end

end

#index(conf, stats, log) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/anschel/filter/index.rb', line 11

def index conf, stats, log
  stamp  = conf.delete(:stamp)  || '@timestamp'
  prefix = conf.delete(:prefix) || 'logs-%{type}-'
  suffix = conf.delete(:suffix) || '%Y.%m.%d'
  format = conf.delete(:format) || %w[
    yyyy-MM-dd'T'HH:mm:ss.SSSZZ
    yyyy-MM-dd'T'HH:mm:ss.SSSZ
    yyyy-MM-dd'T'HH:mm:ssZZ
    yyyy-MM-dd'T'HH:mm:ssZ
  ] # ISO8601
  routing = conf.delete(:routing)

  error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'index-error'

  stamp = stamp.to_sym

  format = [ format ] unless format.is_a? Array

  joda = format.map do |f|
    j = org.joda.time.format.DateTimeFormat.forPattern f
    j.withDefaultYear(Time.new.year).withOffsetParsed
  end

  stats.create 'filter-index'
  stats.create 'filter-index-skipped'
  stats.create 'filter-index-error'

  log.info event: 'filter-compiled', kind: 'index', \
    stamp: stamp, prefix: prefix, suffix: suffix, format: format, routing: routing



  lambda do |event|
    idx_prefix = prefix % event

    event[:_routing] = routing % event if routing

    stamped = event.has_key? stamp
    matched = false

    joda.each do |j|
      begin
        millis = j.parseMillis event[stamp]
        idx_suffix = Time.at(0.001 * millis).utc.strftime(suffix)
        event[:_index] = idx_prefix + idx_suffix
        stats.inc 'filter-index'
        matched = true
      rescue java.lang.IllegalArgumentException => e
      end
      break if matched
    end if stamped

    return filtered(event, conf) if matched

    timestamp = Time.now.utc
    event[stamp] = timestamp.iso8601(3) unless stamped
    event[:_index] = idx_prefix + timestamp.strftime(suffix)

    log.warn \
      event: 'filter-index-warning',
      reason: 'could not parse event',
      remediation: 'added bogus index',
      remediation: "sending to best-guess index '#{event[:_index]}'",
      stamp: stamp,
      prefix: prefix,
      suffix: suffix,
      format: format,
      raw_event: event

    if error_tag
      event[:tags] ||= []
      event[:tags] << error_tag
    end

    stats.inc 'filter-index-error'
    stats.inc 'filter-index'
    filtered event, conf
  end

end

#parse(conf, stats, log) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/anschel/filter/parse.rb', line 9

def parse conf, stats, log
  field = conf.delete :field
  pattern = Regexp.new conf.delete(:pattern)
  unless_field = conf.delete(:unless_field) || '@timestamp'

  error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'parse-error'

  raise 'Missing required "field" for "parse" filter' if field.nil?
  raise 'Missing required "pattern" for "parse" filter' if pattern.nil?

  field = field.to_sym
  unless_field = unless_field.to_sym

  stats.create 'filter-parse'
  stats.create 'filter-parse-skipped'
  stats.create 'filter-parse-error'

  log.info event: 'filter-compiled', kind: 'parse', \
    field: field, pattern: pattern



  lambda do |event|
    unless event[field]
      stats.inc 'filter-parse-skipped'
      return event
    end

    if event[unless_field]
      stats.inc 'filter-parse-skipped'
      return event
    end

    mdata = pattern.match event[field]

    if mdata.nil?
      log.warn \
        event: 'parse-filter-error',
        reason: 'regexp did not match',
        field: field,
        pattern: pattern,
        raw_event: event
      stats.inc 'filter-parse-error'
      if error_tag
        event[:tags] ||= []
        event[:tags] << error_tag
      end
      return event
    end

    mdata.names.each do |group|
      event[group.to_sym] = mdata[group]
    end

    stats.inc 'filter-parse'
    filtered event, conf
  end

end

#scan(conf, stats, log) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/anschel/filter/scan.rb', line 10

def scan conf, stats, log
  field   = conf.delete :field
  pattern = conf.delete :pattern
  target  = conf.delete :target

  error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'scan-error'

  raise 'Missing required "field" for "scan" filter' if field.nil?
  raise 'Missing required "pattern" for "scan" filter' if pattern.nil?
  raise 'Missing required "target" for "convert" filter' if target.nil?

  field  = field.to_sym
  target = target.to_sym
  match  = Regexp.new pattern

  stats.create 'filter-scan'
  stats.create 'filter-scan-skipped'
  stats.create 'filter-scan-nomatch'
  stats.create 'filter-scan-error'

  log.info event: 'filter-compiled', kind: 'scan', \
    field: field, pattern: pattern, match: match, target: target



  lambda do |event|
    unless event[field]
      stats.inc 'filter-scan-skipped'
      return event
    end

    results = event[field].scan(match).flatten.uniq.map do |s|
      s.reverse.reverse # N.B. There seems to be some issue with the "scan"
                        #      function in JRuby wherein the matches are
                        #      shared across threads or somehow mangled.
                        #      The reverse.reverse here ensures that we
                        #      create a new object with the original
                        #      contents still intact. If you have a
                        #      better solution, please contact me!
    end

    if results.empty?
      stats.inc 'filter-scan-nomatch'
      event

    else
      event[target] ||= []
      event[target]  += results
      stats.inc 'filter-scan'
      filtered event, conf
    end
  end

end

#stamp(conf, stats, log) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/anschel/filter/stamp.rb', line 14

def stamp conf, stats, log
  utc       = conf.delete :utc?
  field     = conf.delete :field
  pattern   = conf.delete :pattern
  target    = conf.delete :target
  precision = conf.delete(:precision) || 3

  error_tag = conf.has_key?(:error_tag) ? conf[:error_tag] : 'stamp-error'

  raise 'Missing required "field" for "stamp" filter' if field.nil?
  raise 'Missing required "pattern" for "stamp" filter' if pattern.nil?

  patterns = pattern.is_a?(Array) ? pattern : [ pattern ]
  target ||= '@timestamp'

  field  = field.to_sym
  target = target.to_sym

  parsers = patterns.map do |p|
    joda = org.joda.time.format.DateTimeFormat.forPattern p
    joda = joda.withDefaultYear Time.new.year
    joda = joda.withOffsetParsed
  end

  offset_s = utc ? Time.zone_offset(Time.now.zone).to_f : 0.0

  stats.create 'filter-stamp'
  stats.create 'filter-stamp-skipped'
  stats.create 'filter-stamp-error'

  log.info event: 'filter-compiled', kind: 'stamp', \
    utc?: utc, field: field, pattern: pattern, target: target



  lambda do |event|
    unless event[field]
      stats.inc 'filter-stamp-skipped'
      return event
    end

    if event[target]
      log.warn \
        event: 'stamp-filter-warning',
        reason: 'event already has target field',
        utc?: utc,
        field: field,
        pattern: pattern,
        target: target,
        raw_event: event
      event[target] = \
        DateTime.parse(event[target]).to_time.utc.iso8601(precision)
      return event
    end

    event_field = event[field].dup

    matched = false
    parsers.each do |joda|
      begin
        millis = joda.parseMillis event[field]
        event[target] = Time.at(0.001 * millis + offset_s).utc.iso8601(precision)
        stats.inc 'filter-stamp'
        matched = true
      rescue
      end
      break if matched
    end

    return filtered(event, conf) if matched

    log.warn \
      event: 'stamp-filter-warning',
      reason: 'could not parse event',
      remediation: 'using current time for stamp',
      utc?: utc,
      field: field,
      pattern: pattern,
      target: target,
      raw_event: event

    if error_tag
      event[:tags] ||= []
      event[:tags] << error_tag
    end

    event[target] = Time.now.utc.iso8601(precision)
    stats.inc 'filter-stamp-error'
    filtered event, conf
  end

end

#tag(conf, stats, log) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/anschel/filter/tag.rb', line 8

def tag conf, stats, log
  tags = conf.delete :with

  raise 'Missing required "with" for "tag" filter' if tags.nil?

  tags = tags.is_a?(Array) ? tags : [ tags ]

  stats.create 'filter-tag'

  log.info event: 'filter-compiled', kind: 'tag', with: tags



  lambda do |event|
    event[:tags] ||= []
    event[:tags]  += tags
    event[:tags].uniq!
    stats.inc 'filter-tag'
    filtered event, conf
  end

end