Class: Fluent::Plugin::ThrottleFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_throttle.rb

Defined Under Namespace

Classes: Group

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/fluent/plugin/filter_throttle.rb', line 46

def configure(conf)
  super

  @group_key_paths = group_key.map { |key| key.split(".") }

  raise "group_bucket_period_s must be > 0" \
    unless @group_bucket_period_s > 0

  @group_gc_timeout_s = 2 * @group_bucket_period_s

  raise "group_bucket_limit must be > 0" \
    unless @group_bucket_limit > 0

  @group_rate_limit = (@group_bucket_limit / @group_bucket_period_s)

  @group_reset_rate_s = @group_rate_limit \
    if @group_reset_rate_s == nil

  raise "group_reset_rate_s must be >= -1" \
    unless @group_reset_rate_s >= -1
  raise "group_reset_rate_s must be <= group_bucket_limit / group_bucket_period_s" \
    unless @group_reset_rate_s <= @group_rate_limit

  raise "group_warning_delay_s must be >= 1" \
    unless @group_warning_delay_s >= 1
end

#filter(tag, time, record) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/fluent/plugin/filter_throttle.rb', line 84

def filter(tag, time, record)
  now = Time.now
  rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record
  group = extract_group(record)
  
  # Ruby hashes are ordered by insertion. 
  # Deleting and inserting moves the item to the end of the hash (most recently used)
  counter = @counters[group] = @counters.delete(group) || Group.new(0, now, 0, 0, now, nil)

  counter.rate_count += 1
  since_last_rate_reset = now - counter.rate_last_reset
  if since_last_rate_reset >= 1
    # compute and store rate/s at most every second
    counter.aprox_rate = (counter.rate_count / since_last_rate_reset).round()
    counter.rate_count = 0
    counter.rate_last_reset = now
  end

  # try to evict the least recently used group
  lru_group, lru_counter = @counters.first
  if !lru_group.nil? && now - lru_counter.rate_last_reset > @group_gc_timeout_s
    @counters.delete(lru_group)
  end

  if (now.to_i / @group_bucket_period_s) \
      > (counter.bucket_last_reset.to_i / @group_bucket_period_s)
    # next time period reached.

    # wait until rate drops back down (if enabled).
    if counter.bucket_count == -1 and @group_reset_rate_s != -1
      if counter.aprox_rate < @group_reset_rate_s
        log_rate_back_down(now, group, counter)
      else
        log_rate_limit_exceeded(now, group, counter)
        return rate_limit_exceeded
      end
    end

    # reset counter for the rest of time period.
    counter.bucket_count = 0
    counter.bucket_last_reset = now
  else
    # if current time period credit is exhausted, drop the record.
    if counter.bucket_count == -1
      log_rate_limit_exceeded(now, group, counter)
      return rate_limit_exceeded
    end
  end

  counter.bucket_count += 1

  # if we are out of credit, we drop logs for the rest of the time period.
  if counter.bucket_count > @group_bucket_limit
    log_rate_limit_exceeded(now, group, counter)
    counter.bucket_count = -1
    return rate_limit_exceeded
  end

  record
end

#shutdownObject



79
80
81
82
# File 'lib/fluent/plugin/filter_throttle.rb', line 79

def shutdown
  log.debug("counters summary: #{@counters}")
  super
end

#startObject



73
74
75
76
77
# File 'lib/fluent/plugin/filter_throttle.rb', line 73

def start
  super

  @counters = {}
end