Class: Prop::LeakyBucketStrategy
- Inherits:
-
Object
- Object
- Prop::LeakyBucketStrategy
- Defined in:
- lib/prop/leaky_bucket_strategy.rb
Class Method Summary collapse
- .build(options) ⇒ Object
-
.change(cache_key, options) ⇒ Object
WARNING: race condition this increment is not atomic, so it might miss counts when used frequently.
- .compare_threshold?(counter, operator, options) ⇒ Boolean
- .counter(cache_key, options) ⇒ Object
- .reset(cache_key) ⇒ Object
- .threshold_reached(options) ⇒ Object
- .validate_options!(options) ⇒ Object
- .zero_counter ⇒ Object
Class Method Details
.build(options) ⇒ Object
38 39 40 41 42 43 44 45 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 38 def build() key = .fetch(:key) handle = .fetch(:handle) cache_key = Prop::Key.normalize([ handle, key ]) "prop/leaky_bucket/#{Digest::MD5.hexdigest(cache_key)}" end |
.change(cache_key, options) ⇒ Object
WARNING: race condition this increment is not atomic, so it might miss counts when used frequently
20 21 22 23 24 25 26 27 28 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 20 def change(cache_key, ) # please open a PR if you know how to support this raise ArgumentError, "decrement is not supported for LeakyBucketStrategy" if .key?(:decrement) counter = counter(cache_key, ) counter[:bucket] += .fetch(:increment, 1) Prop::Limiter.cache.write(cache_key, counter) counter end |
.compare_threshold?(counter, operator, options) ⇒ Boolean
34 35 36 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 34 def compare_threshold?(counter, operator, ) counter.fetch(:bucket).to_i.send operator, .fetch(:burst_rate) end |
.counter(cache_key, options) ⇒ Object
8 9 10 11 12 13 14 15 16 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 8 def counter(cache_key, ) bucket = Prop::Limiter.cache.read(cache_key) || zero_counter now = Time.now.to_i leak_amount = (now - bucket.fetch(:last_updated)) / .fetch(:interval) * .fetch(:threshold) bucket[:bucket] = [bucket.fetch(:bucket) - leak_amount, 0].max bucket[:last_updated] = now bucket end |
.reset(cache_key) ⇒ Object
30 31 32 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 30 def reset(cache_key) Prop::Limiter.cache.write(cache_key, zero_counter) end |
.threshold_reached(options) ⇒ Object
47 48 49 50 51 52 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 47 def threshold_reached() burst_rate = .fetch(:burst_rate) threshold = .fetch(:threshold) "#{[:handle]} threshold of #{threshold} tries per #{[:interval]}s and burst rate #{burst_rate} tries exceeded for key #{[:key].inspect}, hash #{[:cache_key]}" end |
.validate_options!(options) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 54 def () Prop::IntervalStrategy.() if ![:burst_rate].is_a?(Fixnum) || [:burst_rate] < [:threshold] raise ArgumentError.new(":burst_rate must be an Integer and not less than :threshold") end if [:first_throttled] raise ArgumentError.new(":first_throttled is not supported") end end |
.zero_counter ⇒ Object
66 67 68 |
# File 'lib/prop/leaky_bucket_strategy.rb', line 66 def zero_counter { bucket: 0, last_updated: 0 } end |