Class: Fluent::TimeSamplingFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_time_sampling.rb

Instance Method Summary collapse

Instance Method Details

#cache(key, data = nil) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/fluent/plugin/filter_time_sampling.rb', line 60

def cache(key, data = nil)
  if data.nil?
    @cache[key] ||= { :time => Time.at(0) }
    expired = Time.now > @cache[key][:time]
    expired ? nil : @cache[key][:data]
  else
    @cache[key] = {
      :data => data,
      :time => Time.now + @interval
    }
  end
end

#clear_unused_cacheObject



73
74
75
76
77
78
# File 'lib/fluent/plugin/filter_time_sampling.rb', line 73

def clear_unused_cache
  @cache.each_key do |key|
    expired = Time.now > @cache[key][:time] + @cache_period
    @cache.delete(key) if expired
  end
end

#configure(conf) ⇒ Object



15
16
17
18
19
20
21
22
# File 'lib/fluent/plugin/filter_time_sampling.rb', line 15

def configure(conf)
  super
  @cache = {}
  @cache_period = @interval * 2
  @interval = -1 if @interval.zero?
  @unused_cache_clear_interval = 300
  @unused_cache_clear_time = Time.now
end

#filter(tag, time, record) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/filter_time_sampling.rb', line 32

def filter(tag, time, record)
  cache_key = @unit.map do |unit|
    if unit != "${tag}" && !record.has_key?(unit)
      raise "#{unit}: unit not found"
    end
    unit != "${tag}" ? record[unit] : tag
  end.join(":")

  @keep_keys.each do |key|
    unless record.has_key?(key)
      raise "#{key}: keep_keys not found"
    end
  end

  unless cache(cache_key)
    cache(cache_key, "")
    new_record = @keep_keys.empty? ?
      record.dup : record.select { |k, v| @keep_keys.include?(k) }
  end

  if Time.now > @unused_cache_clear_time + @unused_cache_clear_interval
    clear_unused_cache
    @unused_cache_clear_time = Time.now
  end

  new_record ||= nil
end

#shutdownObject



28
29
30
# File 'lib/fluent/plugin/filter_time_sampling.rb', line 28

def shutdown
  super
end

#startObject



24
25
26
# File 'lib/fluent/plugin/filter_time_sampling.rb', line 24

def start
  super
end