Class: Fluent::Plugin::ElasticsearchTimestampCheckFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_elasticsearch_timestamp_check.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#timestamp_digitsObject (readonly)

Returns the value of attribute timestamp_digits.



5
6
7
# File 'lib/fluent/plugin/filter_elasticsearch_timestamp_check.rb', line 5

def timestamp_digits
  @timestamp_digits
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


11
12
13
14
15
16
17
# File 'lib/fluent/plugin/filter_elasticsearch_timestamp_check.rb', line 11

def configure(conf)
  super
  require 'date'
  raise Fluent::ConfigError, "specify 1 or bigger number." if subsecond_precision < 1
  @strftime_format = "%Y-%m-%dT%H:%M:%S.%#{@subsecond_precision}N%z".freeze
  @timestamp_digits = configure_digits
end

#configure_digitsObject



19
20
21
22
23
24
25
# File 'lib/fluent/plugin/filter_elasticsearch_timestamp_check.rb', line 19

def configure_digits
  subepoch_precision = 10 + @subsecond_precision
  timestamp_digits = [10, 13]
  timestamp_digits << subepoch_precision
  timestamp_digits.uniq!
  timestamp_digits
end

#filter(tag, time, record) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/fluent/plugin/filter_elasticsearch_timestamp_check.rb', line 35

def filter(tag, time, record)
  %w{@timestamp timestamp time syslog_timestamp}.map do |field|
    record[field]
  end.compact.each do |timestamp|
    begin
      # all digit entry would be treated as epoch seconds or epoch millis
      if !!(timestamp =~ /\A[-+]?\d+\z/)
        num = timestamp.to_f
        # By default, epoch second or epoch millis should be either 10 or 13 digits
        # other length should be considered invalid (until the next digit
        # rollover at 2286-11-20  17:46:40 Z
        # For user-defined precision also should handle here.
        next unless @timestamp_digits.include?(Math.log10(num).to_i + 1)
        record['@timestamp'] = record['fluent_converted_timestamp'] =
          Time.at(
            num / (10 ** ((Math.log10(num).to_i + 1) - 10))
          ).strftime(@strftime_format)
        break
      end

      # normal timestamp string processing
      record['@timestamp'] = record['fluent_converted_timestamp'] =
        DateTime.parse(timestamp).strftime(@strftime_format)
      $log.debug("Timestamp parsed: #{record['@timestamp']}")
      break
    rescue ArgumentError
    end
  end

  unless record['fluent_converted_timestamp']
    record['@timestamp'] = record['fluent_added_timestamp'] =
      Time.at(time.is_a?(Fluent::EventTime) ? time.to_int : time).strftime(@strftime_format)
    $log.debug("Timestamp added: #{record['@timestamp']}")
  end

  record
end

#shutdownObject



31
32
33
# File 'lib/fluent/plugin/filter_elasticsearch_timestamp_check.rb', line 31

def shutdown
  super
end

#startObject



27
28
29
# File 'lib/fluent/plugin/filter_elasticsearch_timestamp_check.rb', line 27

def start
  super
end