Class: Prop::LeakyBucketStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/prop/leaky_bucket_strategy.rb

Class Method Summary collapse

Class Method Details

.at_threshold?(counter, options) ⇒ Boolean

Returns:



35
36
37
# File 'lib/prop/leaky_bucket_strategy.rb', line 35

def at_threshold?(counter, options)
  counter[:bucket].to_i >= options.fetch(:burst_rate)
end

.build(options) ⇒ Object



39
40
41
42
43
44
45
46
# File 'lib/prop/leaky_bucket_strategy.rb', line 39

def build(options)
  key       = options.fetch(:key)
  handle    = options.fetch(:handle)

  cache_key = Prop::Key.normalize([ handle, key ])

  "prop/leaky_bucket/#{Digest::MD5.hexdigest(cache_key)}"
end

.counter(cache_key, options) ⇒ Object



21
22
23
# File 'lib/prop/leaky_bucket_strategy.rb', line 21

def counter(cache_key, options)
  update_bucket(cache_key, options[:interval], options[:threshold]).merge(burst_rate: options[:burst_rate])
end

.default_bucketObject



48
49
50
# File 'lib/prop/leaky_bucket_strategy.rb', line 48

def default_bucket
  { :bucket => 0, :last_updated => 0 }
end

.increment(cache_key, options, counter) ⇒ Object



25
26
27
28
29
# File 'lib/prop/leaky_bucket_strategy.rb', line 25

def increment(cache_key, options, counter)
  increment = options.fetch(:increment, 1)
  bucket = { :bucket => counter[:bucket].to_i + increment, :last_updated => Time.now.to_i }
  Prop::Limiter.writer.call(cache_key, bucket)
end

.reset(cache_key) ⇒ Object



31
32
33
# File 'lib/prop/leaky_bucket_strategy.rb', line 31

def reset(cache_key)
  Prop::Limiter.writer.call(cache_key, default_bucket)
end

.threshold_reached(options) ⇒ Object



52
53
54
55
56
57
# File 'lib/prop/leaky_bucket_strategy.rb', line 52

def threshold_reached(options)
  burst_rate = options.fetch(:burst_rate)
  threshold  = options.fetch(:threshold)

  "#{options[:handle]} threshold of #{threshold} tries per #{options[:interval]}s and burst rate #{burst_rate} tries exceeded for key '#{options[:key].inspect}', hash #{options[:cache_key]}"
end

.update_bucket(cache_key, interval, leak_rate) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/prop/leaky_bucket_strategy.rb', line 9

def update_bucket(cache_key, interval, leak_rate)
  bucket = Prop::Limiter.reader.call(cache_key) || default_bucket
  now = Time.now.to_i
  leak_amount = (now - bucket[:last_updated]) / interval * leak_rate

  bucket[:bucket] = [bucket[:bucket] - leak_amount, 0].max
  bucket[:last_updated] = now

  Prop::Limiter.writer.call(cache_key, bucket)
  bucket
end

.validate_options!(options) ⇒ Object



59
60
61
62
63
64
65
# File 'lib/prop/leaky_bucket_strategy.rb', line 59

def validate_options!(options)
  Prop::IntervalStrategy.validate_options!(options)

  if !options[:burst_rate].is_a?(Fixnum) || options[:burst_rate] < options[:threshold]
    raise ArgumentError.new(":burst_rate must be an Integer and larger than :threshold")
  end
end