Class: Fluent::ThrottleFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_throttle.rb

Defined Under Namespace

Classes: Bucket, Group

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/filter_throttle.rb', line 22

def configure(conf)
  super

  @group_key_path = group_key.split(".")

  raise "group_bucket_period_s must be > 0" \
    unless @group_bucket_period_s > 0

  raise "group_bucket_limit must be > 0" \
    unless @group_bucket_limit > 0

  @group_rate_limit = (@group_bucket_limit / @group_bucket_period_s)

  @group_reset_rate_s = @group_rate_limit \
    if @group_reset_rate_s == nil

  raise "group_reset_rate_s must be >= -1" \
    unless @group_reset_rate_s >= -1
  raise "group_reset_rate_s must be <= group_bucket_limit / group_bucket_period_s" \
    unless @group_reset_rate_s <= @group_rate_limit

  @warning_delay = (1.0 / @warning_hz)
end

#extract_group(record) ⇒ Object



112
113
114
# File 'lib/fluent/plugin/filter_throttle.rb', line 112

def extract_group(record)
  record.dig(*@group_key_path)
end

#filter(tag, time, record) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/filter_throttle.rb', line 57

def filter(tag, time, record)
  now = Time.now
  group = extract_group(record)
  counter = @counters.fetch(group, nil)
  counter = @counters[group] = Group.new(
    0, now, 0, 0, now, nil) if counter == nil

  counter.rate_count += 1

  since_last_rate_reset = now - counter.rate_last_reset
  if since_last_rate_reset >= 1
    # compute and store rate/s at most every seconds.
    counter.aprox_rate = (counter.rate_count / since_last_rate_reset).round()
    counter.rate_count = 0
    counter.rate_last_reset = now
  end

  if (now.to_i / @group_bucket_period_s) \
      > (counter.bucket_last_reset.to_i / @group_bucket_period_s)
    # next time period reached, reset limit.

    if counter.bucket_count == -1 and @group_reset_rate_s != -1
      # wait until rate drops back down if needed.
      if counter.aprox_rate < @group_reset_rate_s
        log_rate_back_down(now, group, counter)
      else
        since_last_warning = now - counter.last_warning
        if since_last_warning >= @warning_delay
          log_rate_limit_exceeded(now, group, counter)
          counter.last_warning = now
        end
        return nil
      end
    end

    counter.bucket_count = 0
    counter.bucket_last_reset = now
  end

  if counter.bucket_count == -1
    return nil
  end

  counter.bucket_count += 1

  if counter.bucket_count > @group_bucket_limit
    log_rate_limit_exceeded(now, group, counter)
    counter.last_warning = now
    counter.bucket_count = -1
    return nil
  end

  record
end

#log_items(now, group, counter) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/fluent/plugin/filter_throttle.rb', line 124

def log_items(now, group, counter)
  since_last_reset = now - counter.bucket_last_reset
  rate = (counter.bucket_count / since_last_reset).round()
  aprox_rate = counter.aprox_rate
  rate = aprox_rate if aprox_rate > rate

  {'group_key': group,
   'rate_s': rate,
   'period_s': @group_bucket_period_s,
   'limit': @group_bucket_limit,
   'rate_limit_s': @group_rate_limit,
   'reset_rate_s': @group_reset_rate_s}
end

#log_rate_back_down(now, group, counter) ⇒ Object



120
121
122
# File 'lib/fluent/plugin/filter_throttle.rb', line 120

def log_rate_back_down(now, group, counter)
  $log.info("rate back down", log_items(now, group, counter))
end

#log_rate_limit_exceeded(now, group, counter) ⇒ Object



116
117
118
# File 'lib/fluent/plugin/filter_throttle.rb', line 116

def log_rate_limit_exceeded(now, group, counter)
  $log.warn("rate exceeded", log_items(now, group, counter))
end

#shutdownObject



52
53
54
55
# File 'lib/fluent/plugin/filter_throttle.rb', line 52

def shutdown
  $log.info("counters summary: #{@counters}")
  super
end

#startObject



46
47
48
49
50
# File 'lib/fluent/plugin/filter_throttle.rb', line 46

def start
  super

  @counters = Hash.new()
end