Class: Fluent::Plugin::SamplingFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_sampling.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



11
12
13
14
15
16
# File 'lib/fluent/plugin/filter_sampling.rb', line 11

def configure(conf)
  super

  @counts = {}
  @resets = {} if @minimum_rate_per_min
end

#filter(tag, _time, record) ⇒ Object

Access to @counts SHOULD be protected by mutex, with a heavy penalty. Code below is not thread safe, but @counts (counter for sampling rate) is not so serious value (and probably will not be broken…), then i let here as it is now.



23
24
25
26
27
28
29
30
# File 'lib/fluent/plugin/filter_sampling.rb', line 23

def filter(tag, _time, record)
  t = @sample_unit == :all ? 'all' : tag
  if @minimum_rate_per_min
    filter_with_minimum_rate(t, record)
  else
    filter_simple(t, record)
  end
end

#filter_simple(t, record) ⇒ Object



32
33
34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/filter_sampling.rb', line 32

def filter_simple(t, record)
  c = (@counts[t] = @counts.fetch(t, 0) + 1)
  # reset only just before @counts[t] is to be bignum from fixnum
  @counts[t] = 0 if c > 0x6fffffff
  if c % @interval == 0
    record
  else
    nil
  end
end

#filter_with_minimum_rate(t, record) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/fluent/plugin/filter_sampling.rb', line 43

def filter_with_minimum_rate(t, record)
  @resets[t] ||= Fluent::Clock.now + (60 - rand(30))
  if Fluent::Clock.now > @resets[t]
    @resets[t] = Fluent::Clock.now + 60
    @counts[t] = 0
  end
  c = (@counts[t] = @counts.fetch(t, 0) + 1)
  if c < @minimum_rate_per_min || c % @interval == 0
    record.dup
  else
    nil
  end
end