Class: Prop::LeakyBucketStrategy
- Inherits:
-
Object
- Object
- Prop::LeakyBucketStrategy
- Defined in:
- lib/prop/leaky_bucket_strategy.rb
Class Method Summary collapse
- ._throttle_leaky_bucket(handle, key, cache_key, options) ⇒ Object
- .build(options) ⇒ Object
- .cache ⇒ Object
- .compare_threshold?(bucket, operator, options) ⇒ Boolean
- .counter(cache_key, options) ⇒ Object
- .decrement(cache_key, amount, options) ⇒ Object
-
.increment(cache_key, amount, options) ⇒ Object
WARNING: race condition this increment is not atomic, so it might miss counts when used frequently.
- .leak_amount(bucket, amount, options, now) ⇒ Object
- .reset(cache_key, options = {}) ⇒ Object
- .threshold_reached(options) ⇒ Object
- .update_bucket(current_bucket_size, max_bucket_size, amount) ⇒ Object
- .validate_options!(options) ⇒ Object
- .zero_counter ⇒ Object
Class Method Details
._throttle_leaky_bucket(handle, key, cache_key, options) ⇒ Object
8 9 10 11 12 13 14 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 8 def _throttle_leaky_bucket(handle, key, cache_key, ) (over_limit, bucket) = .key?(:decrement) ? decrement(cache_key, .fetch(:decrement), ) : increment(cache_key, .fetch(:increment, 1), ) [over_limit, bucket] end |
.build(options) ⇒ Object
73 74 75 76 77 78 79 80 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 73 def build() key = .fetch(:key) handle = .fetch(:handle) cache_key = Prop::Key.normalize([ handle, key ]) "prop/leaky_bucket/#{Digest::MD5.hexdigest(cache_key)}" end |
.cache ⇒ Object
105 106 107 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 105 def cache Prop::Limiter.cache end |
.compare_threshold?(bucket, operator, options) ⇒ Boolean
69 70 71 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 69 def compare_threshold?(bucket, operator, ) bucket.fetch(:over_limit, false) end |
.counter(cache_key, options) ⇒ Object
16 17 18 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 16 def counter(cache_key, ) cache.read(cache_key) || zero_counter end |
.decrement(cache_key, amount, options) ⇒ Object
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 54 def decrement(cache_key, amount, ) now = Time.now.to_i bucket = counter(cache_key, ) leak_amount = leak_amount(bucket, amount, , now) bucket[:bucket] = [bucket[:bucket] - amount - leak_amount, 0].max bucket[:last_leak_time] = now if leak_amount > 0 bucket[:over_limit] = false cache.write(cache_key, bucket) [false, bucket] end |
.increment(cache_key, amount, options) ⇒ Object
WARNING: race condition this increment is not atomic, so it might miss counts when used frequently
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 34 def increment(cache_key, amount, ) bucket = counter(cache_key, ) now = Time.now.to_i max_bucket_size = .fetch(:burst_rate) current_bucket_size = bucket.fetch(:bucket, 0) leak_amount = leak_amount(bucket, amount, , now) if leak_amount > 0 # maybe TODO, update last_leak_time to reflect the exact time for the current leak amount # the current strategy will always reflect a little less leakage, probably not an issue though bucket[:last_leak_time] = now current_bucket_size = [(current_bucket_size - leak_amount), 0].max end over_limit, updated_bucket_size = update_bucket(current_bucket_size, max_bucket_size, amount) bucket[:bucket] = updated_bucket_size bucket[:over_limit] = over_limit cache.write(cache_key, bucket) [over_limit, bucket] end |
.leak_amount(bucket, amount, options, now) ⇒ Object
20 21 22 23 24 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 20 def leak_amount(bucket, amount, , now) leak_rate = (now - bucket.fetch(:last_leak_time, 0)) / .fetch(:interval).to_f leak_amount = (leak_rate * .fetch(:threshold).to_f) leak_amount.to_i end |
.reset(cache_key, options = {}) ⇒ Object
65 66 67 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 65 def reset(cache_key, = {}) cache.write(cache_key, zero_counter, raw: true) end |
.threshold_reached(options) ⇒ Object
82 83 84 85 86 87 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 82 def threshold_reached() burst_rate = .fetch(:burst_rate) threshold = .fetch(:threshold) "#{options[:handle]} threshold of #{threshold} tries per #{options[:interval]}s and burst rate #{burst_rate} tries exceeded for key #{options[:key].inspect}, hash #{options[:cache_key]}" end |
.update_bucket(current_bucket_size, max_bucket_size, amount) ⇒ Object
26 27 28 29 30 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 26 def update_bucket(current_bucket_size, max_bucket_size, amount) over_limit = (max_bucket_size-current_bucket_size) < amount updated_bucket_size = over_limit ? current_bucket_size : current_bucket_size + amount [over_limit, updated_bucket_size] end |
.validate_options!(options) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 89 def () Prop::IntervalStrategy.() if ![:burst_rate].is_a?(Integer) || [:burst_rate] < [:threshold] raise ArgumentError.new(":burst_rate must be an Integer and not less than :threshold") end if [:first_throttled] raise ArgumentError.new(":first_throttled is not supported") end end |
.zero_counter ⇒ Object
101 102 103 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 101 def zero_counter { bucket: 0, last_leak_time: 0, over_limit: false } end |