Class: Prop::LeakyBucketStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/prop/leaky_bucket_strategy.rb

Class Method Summary collapse

Class Method Details

._throttle_leaky_bucket(handle, key, cache_key, options) ⇒ Object



8
9
10
11
12
13
14
# File 'lib/prop/leaky_bucket_strategy.rb', line 8

def _throttle_leaky_bucket(handle, key, cache_key, options)
  (over_limit, bucket) = options.key?(:decrement) ?
    decrement(cache_key, options.fetch(:decrement), options) :
    increment(cache_key, options.fetch(:increment, 1), options)

  [over_limit, bucket]
end

.build(options) ⇒ Object



73
74
75
76
77
78
79
80
# File 'lib/prop/leaky_bucket_strategy.rb', line 73

def build(options)
  key       = options.fetch(:key)
  handle    = options.fetch(:handle)

  cache_key = Prop::Key.normalize([ handle, key ])

  "prop/leaky_bucket/#{Digest::MD5.hexdigest(cache_key)}"
end

.cacheObject



105
106
107
# File 'lib/prop/leaky_bucket_strategy.rb', line 105

def cache
  Prop::Limiter.cache
end

.compare_threshold?(bucket, operator, options) ⇒ Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/prop/leaky_bucket_strategy.rb', line 69

def compare_threshold?(bucket, operator, options)
  bucket.fetch(:over_limit, false)
end

.counter(cache_key, options) ⇒ Object



16
17
18
# File 'lib/prop/leaky_bucket_strategy.rb', line 16

def counter(cache_key, options)
  cache.read(cache_key) || zero_counter
end

.decrement(cache_key, amount, options) ⇒ Object



54
55
56
57
58
59
60
61
62
63
# File 'lib/prop/leaky_bucket_strategy.rb', line 54

def decrement(cache_key, amount, options)
  now = Time.now.to_i
  bucket = counter(cache_key, options)
  leak_amount = leak_amount(bucket, amount, options, now)
  bucket[:bucket] = [bucket[:bucket] - amount - leak_amount, 0].max
  bucket[:last_leak_time] = now if leak_amount > 0
  bucket[:over_limit] = false
  cache.write(cache_key, bucket)
  [false, bucket]
end

.increment(cache_key, amount, options) ⇒ Object

WARNING: race condition this increment is not atomic, so it might miss counts when used frequently



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/prop/leaky_bucket_strategy.rb', line 34

def increment(cache_key, amount, options)
  bucket = counter(cache_key, options)
  now = Time.now.to_i
  max_bucket_size = options.fetch(:burst_rate)
  current_bucket_size = bucket.fetch(:bucket, 0)
  leak_amount = leak_amount(bucket, amount, options, now)
  if leak_amount > 0
    # maybe TODO, update last_leak_time to reflect the exact time for the current leak amount
    # the current strategy will always reflect a little less leakage, probably not an issue though
    bucket[:last_leak_time] = now
    current_bucket_size = [(current_bucket_size - leak_amount), 0].max
  end

  over_limit, updated_bucket_size = update_bucket(current_bucket_size, max_bucket_size, amount)
  bucket[:bucket] = updated_bucket_size
  bucket[:over_limit] = over_limit
  cache.write(cache_key, bucket)
  [over_limit, bucket]
end

.leak_amount(bucket, amount, options, now) ⇒ Object



20
21
22
23
24
# File 'lib/prop/leaky_bucket_strategy.rb', line 20

def leak_amount(bucket, amount, options, now)
  leak_rate = (now - bucket.fetch(:last_leak_time, 0)) / options.fetch(:interval).to_f
  leak_amount = (leak_rate * options.fetch(:threshold).to_f)
  leak_amount.to_i
end

.reset(cache_key, options = {}) ⇒ Object



65
66
67
# File 'lib/prop/leaky_bucket_strategy.rb', line 65

def reset(cache_key, options = {})
  cache.write(cache_key, zero_counter, raw: true)
end

.threshold_reached(options) ⇒ Object



82
83
84
85
86
87
# File 'lib/prop/leaky_bucket_strategy.rb', line 82

def threshold_reached(options)
  burst_rate = options.fetch(:burst_rate)
  threshold  = options.fetch(:threshold)

  "#{options[:handle]} threshold of #{threshold} tries per #{options[:interval]}s and burst rate #{burst_rate} tries exceeded for key #{options[:key].inspect}, hash #{options[:cache_key]}"
end

.update_bucket(current_bucket_size, max_bucket_size, amount) ⇒ Object



26
27
28
29
30
# File 'lib/prop/leaky_bucket_strategy.rb', line 26

def update_bucket(current_bucket_size, max_bucket_size, amount)
  over_limit = (max_bucket_size-current_bucket_size) < amount
  updated_bucket_size = over_limit ? current_bucket_size : current_bucket_size + amount
  [over_limit, updated_bucket_size]
end

.validate_options!(options) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/prop/leaky_bucket_strategy.rb', line 89

def validate_options!(options)
  Prop::IntervalStrategy.validate_options!(options)

  if !options[:burst_rate].is_a?(Integer) || options[:burst_rate] < options[:threshold]
    raise ArgumentError.new(":burst_rate must be an Integer and not less than :threshold")
  end

  if options[:first_throttled]
    raise ArgumentError.new(":first_throttled is not supported")
  end
end

.zero_counterObject



101
102
103
# File 'lib/prop/leaky_bucket_strategy.rb', line 101

def zero_counter
  { bucket: 0, last_leak_time: 0, over_limit: false }
end