Class: Fluent::Plugin::SamplingFilter
- Inherits:
-
Filter
- Object
- Filter
- Fluent::Plugin::SamplingFilter
- Defined in:
- lib/fluent/plugin/filter_sampling.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#filter(tag, _time, record) ⇒ Object
Access to @counts SHOULD be protected by mutex, with a heavy penalty.
- #filter_simple(t, record) ⇒ Object
- #filter_with_minimum_rate(t, record) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
12 13 14 15 16 17 |
# File 'lib/fluent/plugin/filter_sampling.rb', line 12 def configure(conf) super @counts = {} @resets = {} if @minimum_rate_per_min end |
#filter(tag, _time, record) ⇒ Object
Access to @counts SHOULD be protected by mutex, with a heavy penalty. Code below is not thread safe, but @counts (counter for sampling rate) is not so serious value (and probably will not be broken…), then i let here as it is now.
24 25 26 27 28 29 30 31 |
# File 'lib/fluent/plugin/filter_sampling.rb', line 24 def filter(tag, _time, record) t = @sample_unit == :all ? 'all' : tag if @minimum_rate_per_min filter_with_minimum_rate(t, record) else filter_simple(t, record) end end |
#filter_simple(t, record) ⇒ Object
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/fluent/plugin/filter_sampling.rb', line 33 def filter_simple(t, record) c = (@counts[t] = @counts.fetch(t, 0) + 1) # reset only just before @counts[t] is to be bignum from fixnum @counts[t] = 0 if c > 0x6fffffff if c % @interval == 0 record else nil end end |
#filter_with_minimum_rate(t, record) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/fluent/plugin/filter_sampling.rb', line 44 def filter_with_minimum_rate(t, record) @resets[t] ||= Fluent::Clock.now + (60 - rand(30)) if Fluent::Clock.now > @resets[t] @resets[t] = Fluent::Clock.now + 60 @counts[t] = 0 end c = (@counts[t] = @counts.fetch(t, 0) + 1) if c < @minimum_rate_per_min || c % @interval == 0 record.dup else nil end end |