Class: LogStash::Filters::KV

Inherits:
Base
  • Object
show all
Extended by:
PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
Includes:
PluginMixins::ECSCompatibilitySupport, PluginMixins::ECSCompatibilitySupport::TargetCheck
Defined in:
lib/logstash/filters/kv.rb

Overview

This filter helps automatically parse messages (or specific event fields) which are of the ‘foo=bar` variety.

For example, if you have a log message which contains ‘ip=1.2.3.4 error=REFUSED`, you can parse those automatically by configuring:

source,ruby

filter {

kv { }

}

The above will result in a message of ‘ip=1.2.3.4 error=REFUSED` having the fields:

  • ‘ip: 1.2.3.4`

  • ‘error: REFUSED`

This is great for postfix, iptables, and other types of logs that tend towards ‘key=value` syntax.

You can configure any arbitrary strings to split your data on, in case your data is not structured using ‘=` signs and whitespace. For example, this filter can also be used to parse query parameters like `foo=bar&baz=fizz` by setting the `field_split` parameter to `&`.

Defined Under Namespace

Classes: TimeoutException

Constant Summary collapse

TRANSFORM_LOWERCASE_KEY =

Constants used for transform check

"lowercase"
TRANSFORM_UPPERCASE_KEY =
"uppercase"
TRANSFORM_CAPITALIZE_KEY =
"capitalize"
EMPTY_STRING =
''.freeze

Instance Method Summary collapse

Instance Method Details

#closeObject



466
467
# File 'lib/logstash/filters/kv.rb', line 466

def close
end

#filter(event) ⇒ Object



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
# File 'lib/logstash/filters/kv.rb', line 434

def filter(event)
  value = event.get(@source)

  # if timeout is 0 avoid creating a closure although Timeout.timeout has a bypass for 0s timeouts.
  kv = @timeout_seconds > 0.0 ? Timeout.timeout(@timeout_seconds, TimeoutException) { parse_value(value, event) } : parse_value(value, event)

  # Add default key-values for missing keys
  kv = @default_keys.merge(kv)

  return if kv.empty?

  if @target
    if event.include?(@target)
      @logger.debug? && @logger.debug("Overwriting existing target field", field: @target, existing_value: event.get(@target))
    end
    event.set(@target, kv)
  else
    kv.each{|k, v| event.set(k, v)}
  end

  filter_matched(event)

rescue TimeoutException => e
  logger.warn("Timeout reached in KV filter with value #{summarize(value)}")
  event.tag(@tag_on_timeout)
rescue => ex
  meta = { :exception => ex.message }
  meta[:backtrace] = ex.backtrace if logger.debug?
  logger.warn('Exception while parsing KV', meta)
  @tag_on_failure.each { |tag| event.tag(tag) }
end

#registerObject



346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
# File 'lib/logstash/filters/kv.rb', line 346

def register
  # Too late to set the regexp interruptible flag, at least warn if it is not set.
  require 'java'
  if java.lang.System.getProperty("jruby.regexp.interruptible") != "true"
    logger.warn("KV Filter registered without jruby interruptible regular expressions enabled (`-Djruby.regexp.interruptible=true`); timeouts may not be respected.")
  end

  if @value_split.empty?
    raise LogStash::ConfigurationError, I18n.t(
      "logstash.runner.configuration.invalid_plugin_register",
      :plugin => "filter",
      :type => "kv",
      :error => "Configuration option 'value_split' must be a non-empty string"
    )
  end

  if @field_split_pattern && @field_split_pattern.empty?
    raise LogStash::ConfigurationError, I18n.t(
        "logstash.runner.configuration.invalid_plugin_register",
        :plugin => "filter",
        :type => "kv",
        :error => "Configuration option 'field_split_pattern' must be a non-empty string"
    )
  end

  if @value_split_pattern && @value_split_pattern.empty?
    raise LogStash::ConfigurationError, I18n.t(
        "logstash.runner.configuration.invalid_plugin_register",
        :plugin => "filter",
        :type => "kv",
        :error => "Configuration option 'value_split_pattern' must be a non-empty string"
    )
  end

  @trim_value_re = Regexp.new("^[#{@trim_value}]+|[#{@trim_value}]+$") if @trim_value
  @trim_key_re = Regexp.new("^[#{@trim_key}]+|[#{@trim_key}]+$") if @trim_key

  @remove_char_value_re = Regexp.new("[#{@remove_char_value}]") if @remove_char_value
  @remove_char_key_re = Regexp.new("[#{@remove_char_key}]") if @remove_char_key

  optional_whitespace = / */
  eof = /$/

  field_split_pattern = Regexp::compile(@field_split_pattern || "[#{@field_split}]")
  value_split_pattern = Regexp::compile(@value_split_pattern || "[#{@value_split}]")

  # in legacy-compatible lenient mode, the value splitter can be wrapped in optional whitespace
  if @whitespace == 'lenient'
    value_split_pattern = /#{optional_whitespace}#{value_split_pattern}#{optional_whitespace}/
  end

  # a key is a _captured_ sequence of characters or escaped spaces before optional whitespace
  # and followed by either a `value_split`, a `field_split`, or EOF.
  key_pattern = (original_params.include?('value_split_pattern') || original_params.include?('field_split_pattern')) ?
                    unquoted_capture_until_pattern(value_split_pattern, field_split_pattern) :
                    unquoted_capture_until_charclass(@value_split + @field_split)

  value_pattern = begin
    # each component expression within value_pattern _must_ capture exactly once.
    value_patterns = []

    value_patterns << quoted_capture(%q(")) # quoted double
    value_patterns << quoted_capture(%q(')) # quoted single
    if @include_brackets
      value_patterns << quoted_capture('(', ')') # bracketed paren
      value_patterns << quoted_capture('[', ']') # bracketed square
      value_patterns << quoted_capture('<', '>') # bracketed angle
    end

    # an unquoted value is a _captured_ sequence of characters or escaped spaces before a `field_split` or EOF.
    value_patterns << (original_params.include?('field_split_pattern') ?
                           unquoted_capture_until_pattern(field_split_pattern) :
                           unquoted_capture_until_charclass(@field_split))

    Regexp.union(value_patterns)
  end

  @scan_re = /#{key_pattern}#{value_split_pattern}#{value_pattern}?#{Regexp::union(field_split_pattern, eof)}/
  @value_split_re = value_split_pattern

  @logger.debug? && @logger.debug("KV scan regex", :regex => @scan_re.inspect)

  # divide by float to allow fractional seconds, the Timeout class timeout value is in seconds but the underlying
  # executor resolution is in microseconds so fractional second parameter down to microseconds is possible.
  # see https://github.com/jruby/jruby/blob/9.2.7.0/core/src/main/java/org/jruby/ext/timeout/Timeout.java#L125
  @timeout_seconds = @timeout_millis / 1000.0
end