32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/fluent/plugin/filter_influxdb_deduplication.rb', line 32
def filter(tag, time, record)
if time.is_a?(Integer)
input_time = Fluent::EventTime.new(time)
elsif time.is_a?(Fluent::EventTime)
input_time = time
else
@log.error("unreadable time")
return nil
end
nano_time = input_time.sec * 1000000000
if input_time.sec < @last_timestamp
@log.debug("out of sequence timestamp")
if @out_of_order
record[@out_of_order] = true
record[@time_key] = nano_time
else
@log.debug("out of order record dropped")
return nil
end
elsif input_time.sec == @last_timestamp && @sequence < 999999999
@sequence = @sequence + 1
record[@time_key] = nano_time + @sequence
if @out_of_order
record[@out_of_order] = false
end
elsif input_time.sec == @last_timestamp && @sequence == 999999999
@log.error("received more then 999999999 records in a second")
return nil
else
@sequence = 0
@last_timestamp = input_time.sec
record[@time_key] = nano_time
if @out_of_order
record[@out_of_order] = false
end
end
record
end
|