Class: Fluent::Plugin::SamplingFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_sampling.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



12
13
14
15
16
17
# File 'lib/fluent/plugin/filter_sampling.rb', line 12

def configure(conf)
  super

  @counts = {}
  @resets = {} if @minimum_rate_per_min
end

#filter(tag, _time, record) ⇒ Object

Access to @counts SHOULD be protected by mutex, with a heavy penalty. Code below is not thread safe, but @counts (counter for sampling rate) is not so serious value (and probably will not be broken…), then i let here as it is now.



24
25
26
27
28
29
30
31
# File 'lib/fluent/plugin/filter_sampling.rb', line 24

def filter(tag, _time, record)
  t = @sample_unit == :all ? 'all' : tag
  if @minimum_rate_per_min
    filter_with_minimum_rate(t, record)
  else
    filter_simple(t, record)
  end
end

#filter_simple(t, record) ⇒ Object



33
34
35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/filter_sampling.rb', line 33

def filter_simple(t, record)
  c = (@counts[t] = @counts.fetch(t, 0) + 1)
  # reset only just before @counts[t] is to be bignum from fixnum
  @counts[t] = 0 if c > 0x6fffffff
  if c % @interval == 0
    record
  else
    nil
  end
end

#filter_with_minimum_rate(t, record) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/filter_sampling.rb', line 44

def filter_with_minimum_rate(t, record)
  @resets[t] ||= Fluent::Clock.now + (60 - rand(30))
  if Fluent::Clock.now > @resets[t]
    @resets[t] = Fluent::Clock.now + 60
    @counts[t] = 0
  end
  c = (@counts[t] = @counts.fetch(t, 0) + 1)
  if c < @minimum_rate_per_min || c % @interval == 0
    record.dup
  else
    nil
  end
end