Class: Fluent::Plugin::NotifierOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_notifier.rb

Defined Under Namespace

Classes: Definition, State, Test

Constant Summary collapse

NOTIFICATION_LEVELS =
['OK', 'WARN', 'CRIT', 'LOST'].freeze
STATES_CLEAN_INTERVAL =

1hours

3600
STATES_EXPIRE_SECONDS =

4hours

14400

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#defsObject

Returns the value of attribute defs.



67
68
69
# File 'lib/fluent/plugin/out_notifier.rb', line 67

def defs
  @defs
end

#match_cacheObject

Returns the value of attribute match_cache.



67
68
69
# File 'lib/fluent/plugin/out_notifier.rb', line 67

def match_cache
  @match_cache
end

#negative_cacheObject

Returns the value of attribute negative_cache.



67
68
69
# File 'lib/fluent/plugin/out_notifier.rb', line 67

def negative_cache
  @negative_cache
end

#statesObject

Returns the value of attribute states.



67
68
69
# File 'lib/fluent/plugin/out_notifier.rb', line 67

def states
  @states
end

#testsObject

Returns the value of attribute tests.



67
68
69
# File 'lib/fluent/plugin/out_notifier.rb', line 67

def tests
  @tests
end

Instance Method Details

#check(tag, es) ⇒ Object



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/fluent/plugin/out_notifier.rb', line 198

def check(tag, es)
  notifications = []

  tag = if @input_tag_remove_prefix and
            tag.start_with?(@input_tag_remove_prefix_string) and tag.length > @input_tag_remove_prefix_length
          tag[@input_tag_remove_prefix_length..-1]
        else
          tag
        end

  es.each do |time,record|
    record.keys.each do |key|
      next if @negative_cache[key]

      defs = @match_cache[key]
      unless defs
        defs = []
        @defs.each do |d|
          defs.push(d) if d.match?(key)
        end
        @negative_cache[key] = true if defs.size < 1
      end

      defs.each do |d|
        next unless @tests.reduce(true){|r,t| r and t.test(tag, record)}

        alert = d.check(tag, time, record, key)
        if alert
          notifications.push(alert)
        end
      end
    end
  end

  notifications
end

#configure(conf) ⇒ Object

<match httpstatus.blog>

type notifier
default_tag notification
default_interval_1st 1m
default_repetitions_1st 5
default_interval_2nd 5m
default_repetitions_2nd 5
default_interval_3rd 30m
<test>
  check numeric
  target_key xxx
  lower_threshold xxx
  upper_threshold xxx
</test>
<test>
  check regexp
  target_key xxx
  include_pattern ^.$
  exclude_pattern ^.$
</test>
<def>
  pattern http_status_errors
  check numeric_upward
  warn_threshold 25
  crit_threshold 50
  tag alert

# tag_warn alert.warn # tag_crit alert.crit

  # target_keys blog_5xx_percentage
  target_key_pattern ^.*_5xx_percentage$
</def>
<def>
  pattern log_checker
  check string_find
  crit_pattern 'ERROR'
  warn_pattern 'WARNING'
  tag alert
  # target_keys message
  target_key_pattern ^.*_message$
</def>

</match>



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/fluent/plugin/out_notifier.rb', line 123

def configure(conf)
  super

  @match_cache = {} # cache which has map (fieldname => definition(s))
  @negative_cache = {}
  @tests = []
  @defs = []
  @states = {} # key: tag+field ?

  if @input_tag_remove_prefix
    @input_tag_remove_prefix_string = @input_tag_remove_prefix + '.'
    @input_tag_remove_prefix_length = @input_tag_remove_prefix_string.length
  end

  if @default_interval_1st || @default_interval_2nd || @default_interval_3rd
    @default_intervals = [
      @default_interval_1st || @default_intervals[0],
      @default_interval_2nd || @default_intervals[1],
      @default_interval_3rd || @default_intervals[2],
    ]
  end
  if @default_repetitions_1st || @default_repetitions_2nd
    @default_repetitions = [
      @default_repetitions_1st || @default_repetitions[0],
      @default_repetitions_2nd || @default_repetitions[1],
    ]
  end

  @test_configs.each do |test_config|
    @tests << Test.new(test_config)
  end
  @def_configs.each do |def_config|
    @defs << Definition.new(def_config, self)
  end
end

#multi_workers_ready?Boolean



159
160
161
# File 'lib/fluent/plugin/out_notifier.rb', line 159

def multi_workers_ready?
  true
end

#process(tag, es) ⇒ Object



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/fluent/plugin/out_notifier.rb', line 235

def process(tag, es)
  notifications = check(tag, es)

  if notifications.size > 0
    @mutex.synchronize do
      suppressed_emit(notifications)
    end
  end

  if Fluent::Engine.now - @last_status_cleaned > STATES_CLEAN_INTERVAL
    @mutex.synchronize do
      states_cleanup
      @last_status_cleaned = Fluent::Engine.now
    end
  end
end

#startObject



163
164
165
166
167
# File 'lib/fluent/plugin/out_notifier.rb', line 163

def start
  super
  @mutex = Mutex.new
  @last_status_cleaned = Fluent::Engine.now
end

#states_cleanupObject



189
190
191
192
193
194
195
196
# File 'lib/fluent/plugin/out_notifier.rb', line 189

def states_cleanup
  now = Fluent::Engine.now
  @states.keys.each do |key|
    if now - @states[key].last_notified > STATES_EXPIRE_SECONDS
      @states.delete(key)
    end
  end
end

#suppressed_emit(notifications) ⇒ Object



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/fluent/plugin/out_notifier.rb', line 169

def suppressed_emit(notifications)
  now = Fluent::Engine.now
  notifications.each do |n|
    hashkey = n.delete(:hashkey)
    definition = n.delete(:match_def)
    tag = n.delete(:emit_tag)

    state = @states[hashkey]
    if state
      unless state.suppress?(definition, n)
        router.emit(tag, now, n)
        state.update_notified(definition, n)
      end
    else
      router.emit(tag, now, n)
      @states[hashkey] = State.new(n)
    end
  end
end