Class: Fluent::Plugin::ThrottleFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_throttle.rb

Defined Under Namespace

Classes: Group

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/filter_throttle.rb', line 46

def configure(conf)
  super

  @group_key_paths = group_key.map { |key| key.split(".") }

  raise "group_bucket_period_s must be > 0" \
    unless @group_bucket_period_s > 0

  raise "group_bucket_limit must be > 0" \
    unless @group_bucket_limit > 0

  @group_rate_limit = (@group_bucket_limit / @group_bucket_period_s)

  @group_reset_rate_s = @group_rate_limit \
    if @group_reset_rate_s == nil

  raise "group_reset_rate_s must be >= -1" \
    unless @group_reset_rate_s >= -1
  raise "group_reset_rate_s must be <= group_bucket_limit / group_bucket_period_s" \
    unless @group_reset_rate_s <= @group_rate_limit

  raise "group_warning_delay_s must be >= 1" \
    unless @group_warning_delay_s >= 1
end

#filter(tag, time, record) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/fluent/plugin/filter_throttle.rb', line 82

def filter(tag, time, record)
  now = Time.now
  rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record
  group = extract_group(record)
  counter = (@counters[group] ||= Group.new(0, now, 0, 0, now, nil))
  counter.rate_count += 1

  since_last_rate_reset = now - counter.rate_last_reset
  if since_last_rate_reset >= 1
    # compute and store rate/s at most every seconds.
    counter.aprox_rate = (counter.rate_count / since_last_rate_reset).round()
    counter.rate_count = 0
    counter.rate_last_reset = now
  end

  if (now.to_i / @group_bucket_period_s) \
      > (counter.bucket_last_reset.to_i / @group_bucket_period_s)
    # next time period reached.

    # wait until rate drops back down (if enabled).
    if counter.bucket_count == -1 and @group_reset_rate_s != -1
      if counter.aprox_rate < @group_reset_rate_s
        log_rate_back_down(now, group, counter)
      else
        log_rate_limit_exceeded(now, group, counter)
        return rate_limit_exceeded
      end
    end

    # reset counter for the rest of time period.
    counter.bucket_count = 0
    counter.bucket_last_reset = now
  else
    # if current time period credit is exhausted, drop the record.
    if counter.bucket_count == -1
      log_rate_limit_exceeded(now, group, counter)
      return rate_limit_exceeded
    end
  end

  counter.bucket_count += 1

  # if we are out of credit, we drop logs for the rest of the time period.
  if counter.bucket_count > @group_bucket_limit
    log_rate_limit_exceeded(now, group, counter)
    counter.bucket_count = -1
    return rate_limit_exceeded
  end

  record
end

#shutdownObject



77
78
79
80
# File 'lib/fluent/plugin/filter_throttle.rb', line 77

def shutdown
  log.debug("counters summary: #{@counters}")
  super
end

#startObject



71
72
73
74
75
# File 'lib/fluent/plugin/filter_throttle.rb', line 71

def start
  super

  @counters = {}
end