Class: Fluent::NotifierOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_notifier.rb

Defined Under Namespace

Classes: Definition, State, Test

Constant Summary collapse

NOTIFICATION_LEVELS =
['OK', 'WARN', 'CRIT', 'LOST'].freeze
STATES_CLEAN_INTERVAL =

1hours

3600
STATES_EXPIRE_SECONDS =

4hours

14400

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#defsObject

Returns the value of attribute defs.



21
22
23
# File 'lib/fluent/plugin/out_notifier.rb', line 21

def defs
  @defs
end

#match_cacheObject

Returns the value of attribute match_cache.



21
22
23
# File 'lib/fluent/plugin/out_notifier.rb', line 21

def match_cache
  @match_cache
end

#negative_cacheObject

Returns the value of attribute negative_cache.



21
22
23
# File 'lib/fluent/plugin/out_notifier.rb', line 21

def negative_cache
  @negative_cache
end

#statesObject

Returns the value of attribute states.



21
22
23
# File 'lib/fluent/plugin/out_notifier.rb', line 21

def states
  @states
end

#testsObject

Returns the value of attribute tests.



21
22
23
# File 'lib/fluent/plugin/out_notifier.rb', line 21

def tests
  @tests
end

Instance Method Details

#check(tag, es) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/fluent/plugin/out_notifier.rb', line 152

def check(tag, es)
  notifications = []

  tag = if @input_tag_remove_prefix and
            tag.start_with?(@input_tag_remove_prefix_string) and tag.length > @input_tag_remove_prefix_length
          tag[@input_tag_remove_prefix_length..-1]
        else
          tag
        end

  es.each do |time,record|
    record.keys.each do |key|
      next if @negative_cache[key]

      defs = @match_cache[key]
      unless defs
        defs = []
        @defs.each do |d|
          defs.push(d) if d.match?(key)
        end
        @negative_cache[key] = true if defs.size < 1
      end

      defs.each do |d|
        next unless @tests.reduce(true){|r,t| r and t.test(tag, record)}

        alert = d.check(tag, time, record, key)
        if alert
          notifications.push(alert)
        end
      end
    end
  end

  notifications
end

#configure(conf) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_notifier.rb', line 82

def configure(conf)
  super

  @match_cache = {} # cache which has map (fieldname => definition(s))
  @negative_cache = {}
  @tests = []
  @defs = []
  @states = {} # key: tag+field ?

  if @input_tag_remove_prefix
    @input_tag_remove_prefix_string = @input_tag_remove_prefix + '.'
    @input_tag_remove_prefix_length = @input_tag_remove_prefix_string.length
  end

  defaults = {
    :tag => @default_tag, :tag_warn => @default_tag_warn, :tag_crit => @default_tag_crit,
    :interval_1st => @default_interval_1st, :repetitions_1st => @default_repetitions_1st,
    :interval_2nd => @default_interval_2nd, :repetitions_2nd => @default_repetitions_2nd,
    :interval_3rd => @default_interval_3rd,
  }

  conf.elements.each do |element|
    case element.name
    when 'test'
      @tests.push(Test.new(element))
    when 'def'
      @defs.push(Definition.new(element, defaults))
    else
      raise Fluent::ConfigError, "invalid section name for out_notifier: #{element.name}"
    end
  end
end

#emit(tag, es, chain) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/fluent/plugin/out_notifier.rb', line 189

def emit(tag, es, chain)
  notifications = check(tag, es)

  if notifications.size > 0
    @mutex.synchronize do
      suppressed_emit(notifications)
    end
  end

  if Fluent::Engine.now - @last_status_cleaned > STATES_CLEAN_INTERVAL
    @mutex.synchronize do
      states_cleanup
      @last_status_cleaned = Fluent::Engine.now
    end
  end

  chain.next
end

#startObject



115
116
117
118
119
# File 'lib/fluent/plugin/out_notifier.rb', line 115

def start
  super
  @mutex = Mutex.new
  @last_status_cleaned = Fluent::Engine.now
end

#states_cleanupObject



143
144
145
146
147
148
149
150
# File 'lib/fluent/plugin/out_notifier.rb', line 143

def states_cleanup
  now = Fluent::Engine.now
  @states.keys.each do |key|
    if now - @states[key].last_notified > STATES_EXPIRE_SECONDS
      @states.delete(key)
    end
  end
end

#suppressed_emit(notifications) ⇒ Object

def shutdown end



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/fluent/plugin/out_notifier.rb', line 124

def suppressed_emit(notifications)
  notifications.each do |n|
    hashkey = n.delete(:hashkey)
    definition = n.delete(:match_def)
    tag = n.delete(:emit_tag)

    state = @states[hashkey]
    if state
      unless state.suppress?(definition, n)
        Fluent::Engine.emit(tag, Fluent::Engine.now, n)
        state.update_notified(definition, n)
      end
    else
      Fluent::Engine.emit(tag, Fluent::Engine.now, n)
      @states[hashkey] = State.new(n)
    end
  end
end