Class: Fluent::SuppressOutput

Inherits:
Output
  • Object
show all
Includes:
HandleTagNameMixin
Defined in:
lib/fluent/plugin/out_suppress.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



16
17
18
19
20
21
22
23
24
25
# File 'lib/fluent/plugin/out_suppress.rb', line 16

def configure(conf)
  super

  if ( !@remove_tag_prefix && !@remove_tag_suffix && !@add_tag_prefix && !@add_tag_suffix )
    raise ConfigError, "out_suppress: Set remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix."
  end

  @keys  = @attr_keys ? @attr_keys.split(/ *, */) : nil
  @slots = {}
end

#emit(tag, es, chain) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/out_suppress.rb', line 35

def emit(tag, es, chain)
  es.each do |time, record|
    if @keys
      keys = @keys.map do |key|
        key.split(/\./).inject(record) {|r, k| r[k] }
      end
      key = tag + "\0" + keys.join("\0")
    else
      key = tag
    end
    slot = @slots[key] ||= []

    # expire old records time
    expired = time.to_f - @interval
    while slot.first && (slot.first <= expired)
      slot.shift
    end

    if slot.length >= @num
      log.debug "suppressed record: #{record.to_json}"
      next
    end

    slot.push(time.to_f)
    _tag = tag.clone
    filter_record(_tag, time, record)
    if tag != _tag
      Engine.emit(_tag, time, record)
    else
      log.warn "Drop record #{record} tag '#{tag}' was not replaced. Can't emit record, cause infinity looping. Set remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix correctly."
    end
  end

  chain.next
end

#shutdownObject



31
32
33
# File 'lib/fluent/plugin/out_suppress.rb', line 31

def shutdown
  super
end

#startObject



27
28
29
# File 'lib/fluent/plugin/out_suppress.rb', line 27

def start
  super
end