Class: Fluent::ThrottleFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_throttle.rb

Defined Under Namespace

Classes: Group

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/filter_throttle.rb', line 21

def configure(conf)
  super

  @group_key_path = group_key.split(".")

  raise "group_bucket_period_s must be > 0" \
    unless @group_bucket_period_s > 0

  raise "group_bucket_limit must be > 0" \
    unless @group_bucket_limit > 0

  @group_rate_limit = (@group_bucket_limit / @group_bucket_period_s)

  @group_reset_rate_s = @group_rate_limit \
    if @group_reset_rate_s == nil

  raise "group_reset_rate_s must be >= -1" \
    unless @group_reset_rate_s >= -1
  raise "group_reset_rate_s must be <= group_bucket_limit / group_bucket_period_s" \
    unless @group_reset_rate_s <= @group_rate_limit

  raise "group_warning_delay_s must be >= 1" \
    unless @group_warning_delay_s >= 1
end

#extract_group(record) ⇒ Object



111
112
113
# File 'lib/fluent/plugin/filter_throttle.rb', line 111

def extract_group(record)
  record.dig(*@group_key_path)
end

#filter(tag, time, record) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/fluent/plugin/filter_throttle.rb', line 57

def filter(tag, time, record)
  now = Time.now
  group = extract_group(record)
  counter = @counters.fetch(group, nil)
  counter = @counters[group] = Group.new(
    0, now, 0, 0, now, nil) if counter == nil

  counter.rate_count += 1

  since_last_rate_reset = now - counter.rate_last_reset
  if since_last_rate_reset >= 1
    # compute and store rate/s at most every seconds.
    counter.aprox_rate = (counter.rate_count / since_last_rate_reset).round()
    counter.rate_count = 0
    counter.rate_last_reset = now
  end

  if (now.to_i / @group_bucket_period_s) \
      > (counter.bucket_last_reset.to_i / @group_bucket_period_s)
    # next time period reached.

    # wait until rate drops back down (if enabled).
    if counter.bucket_count == -1 and @group_reset_rate_s != -1
      if counter.aprox_rate < @group_reset_rate_s
        log_rate_back_down(now, group, counter)
      else
        log_rate_limit_exceeded(now, group, counter)
        return nil
      end
    end

    # reset counter for the rest of time period.
    counter.bucket_count = 0
    counter.bucket_last_reset = now
  else
    # if current time period credit is exhausted, drop the record.
    if counter.bucket_count == -1
      log_rate_limit_exceeded(now, group, counter)
      return nil
    end
  end

  counter.bucket_count += 1

  # if we are out of credit, we drop logs for the rest of the time period.
  if counter.bucket_count > @group_bucket_limit
    log_rate_limit_exceeded(now, group, counter)
    counter.bucket_count = -1
    return nil
  end

  record
end

#log_items(now, group, counter) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/fluent/plugin/filter_throttle.rb', line 128

def log_items(now, group, counter)
  since_last_reset = now - counter.bucket_last_reset
  rate = (counter.bucket_count / since_last_reset).round()
  aprox_rate = counter.aprox_rate
  rate = aprox_rate if aprox_rate > rate

  {'group_key': group,
   'rate_s': rate,
   'period_s': @group_bucket_period_s,
   'limit': @group_bucket_limit,
   'rate_limit_s': @group_rate_limit,
   'reset_rate_s': @group_reset_rate_s}
end

#log_rate_back_down(now, group, counter) ⇒ Object



124
125
126
# File 'lib/fluent/plugin/filter_throttle.rb', line 124

def log_rate_back_down(now, group, counter)
  $log.info("rate back down", log_items(now, group, counter))
end

#log_rate_limit_exceeded(now, group, counter) ⇒ Object



115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/filter_throttle.rb', line 115

def log_rate_limit_exceeded(now, group, counter)
  emit = counter.last_warning == nil ? true \
    : (now - counter.last_warning) >= @group_warning_delay_s
  if emit
    $log.warn("rate exceeded", log_items(now, group, counter))
    counter.last_warning = now
  end
end

#shutdownObject



52
53
54
55
# File 'lib/fluent/plugin/filter_throttle.rb', line 52

def shutdown
  $log.info("counters summary: #{@counters}")
  super
end

#startObject



46
47
48
49
50
# File 'lib/fluent/plugin/filter_throttle.rb', line 46

def start
  super

  @counters = Hash.new()
end