Class: Prop::LeakyBucketStrategy
- Inherits:
-
Object
- Object
- Prop::LeakyBucketStrategy
- Defined in:
- lib/prop/leaky_bucket_strategy.rb
Class Method Summary collapse
- .build(options) ⇒ Object
- .compare_threshold?(counter, operator, options) ⇒ Boolean
- .counter(cache_key, options) ⇒ Object
- .decrement(cache_key, amount, options) ⇒ Object
-
.increment(cache_key, amount, options) ⇒ Object
WARNING: race condition this increment is not atomic, so it might miss counts when used frequently.
- .reset(cache_key) ⇒ Object
- .threshold_reached(options) ⇒ Object
- .validate_options!(options) ⇒ Object
- .zero_counter ⇒ Object
Class Method Details
.build(options) ⇒ Object
43 44 45 46 47 48 49 50 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 43 def build() key = .fetch(:key) handle = .fetch(:handle) cache_key = Prop::Key.normalize([ handle, key ]) "prop/leaky_bucket/#{Digest::MD5.hexdigest(cache_key)}" end |
.compare_threshold?(counter, operator, options) ⇒ Boolean
39 40 41 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 39 def compare_threshold?(counter, operator, ) counter.fetch(:bucket).to_i.send operator, .fetch(:burst_rate) end |
.counter(cache_key, options) ⇒ Object
8 9 10 11 12 13 14 15 16 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 8 def counter(cache_key, ) bucket = Prop::Limiter.cache.read(cache_key) || zero_counter now = Time.now.to_i leak_amount = (now - bucket.fetch(:last_updated)) / .fetch(:interval) * .fetch(:threshold) bucket[:bucket] = [bucket.fetch(:bucket) - leak_amount, 0].max bucket[:last_updated] = now bucket end |
.decrement(cache_key, amount, options) ⇒ Object
27 28 29 30 31 32 33 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 27 def decrement(cache_key, amount, ) counter = counter(cache_key, ) counter[:bucket] -= amount counter[:bucket] = 0 unless counter[:bucket] > 0 Prop::Limiter.cache.write(cache_key, counter) counter end |
.increment(cache_key, amount, options) ⇒ Object
WARNING: race condition this increment is not atomic, so it might miss counts when used frequently
20 21 22 23 24 25 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 20 def increment(cache_key, amount, ) counter = counter(cache_key, ) counter[:bucket] += amount Prop::Limiter.cache.write(cache_key, counter) counter end |
.reset(cache_key) ⇒ Object
35 36 37 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 35 def reset(cache_key) Prop::Limiter.cache.write(cache_key, zero_counter) end |
.threshold_reached(options) ⇒ Object
52 53 54 55 56 57 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 52 def threshold_reached() burst_rate = .fetch(:burst_rate) threshold = .fetch(:threshold) "#{[:handle]} threshold of #{threshold} tries per #{[:interval]}s and burst rate #{burst_rate} tries exceeded for key #{[:key].inspect}, hash #{[:cache_key]}" end |
.validate_options!(options) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 59 def () Prop::IntervalStrategy.() if ![:burst_rate].is_a?(Fixnum) || [:burst_rate] < [:threshold] raise ArgumentError.new(":burst_rate must be an Integer and not less than :threshold") end if [:first_throttled] raise ArgumentError.new(":first_throttled is not supported") end end |
.zero_counter ⇒ Object
71 72 73 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 71 def zero_counter { bucket: 0, last_updated: 0 } end |