Class: Sidekiq::Throttler::RateLimit

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/throttler/rate_limit.rb

Overview

Handles the tracking of rate limits.

TODO: Consider reducing threshold and period to smooth out job executions so that "24 jobs every 1 hour" becomes "1 job every 2 minutes and 30 seconds"

Constant Summary collapse

LOCK =
Mutex.new

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, payload, queue, options = {}) ⇒ RateLimit

Returns a new instance of RateLimit.

Parameters:

  • worker (Sidekiq::Worker)

    The worker to rate limit.

  • payload (Array<Object>)

    The message payload for the current job.

  • queue (String)

    The queue to rate limit.

  • [Symbol] (Hash)

    a customizable set of options



42
43
44
45
46
47
48
49
50
51
# File 'lib/sidekiq/throttler/rate_limit.rb', line 42

def initialize(worker, payload, queue, options = {})
  @worker = worker
  @payload = payload
  @queue = queue

  unless @storage_class = lookup_storage(options.fetch(:storage, :memory))
    raise ArgumentError,
      "Unrecognized storage backend: #{options[:storage].inspect}"
  end
end

Instance Attribute Details

#payloadArray (readonly)

Returns The message payload for the current job.

Returns:

  • (Array)

    The message payload for the current job.



23
24
25
# File 'lib/sidekiq/throttler/rate_limit.rb', line 23

def payload
  @payload
end

#queueString (readonly)

Returns The queue to rate limit.

Returns:

  • (String)

    The queue to rate limit.



28
29
30
# File 'lib/sidekiq/throttler/rate_limit.rb', line 28

def queue
  @queue
end

#workerSidekiq::Worker (readonly)

Returns The worker to rate limit.

Returns:

  • (Sidekiq::Worker)

    The worker to rate limit.



18
19
20
# File 'lib/sidekiq/throttler/rate_limit.rb', line 18

def worker
  @worker
end

Instance Method Details

#can_throttle?true, false

Check if rate limiting options were correctly specified on the worker.

Returns:

  • (true, false)


108
109
110
# File 'lib/sidekiq/throttler/rate_limit.rb', line 108

def can_throttle?
  [threshold, period].select(&:zero?).empty?
end

#countInteger

Fetch the number of jobs executed.

Returns:

  • (Integer)

    The current number of jobs executed.



58
59
60
# File 'lib/sidekiq/throttler/rate_limit.rb', line 58

def count
  self.class.count(self)
end

#exceeded {|delay| ... } ⇒ Object

Set a callback to be executed when #execute is called and the rate limit has exceeded the threshold.

Yield Parameters:

  • delay (Integer)

    Delay in seconds to requeue job for.



141
142
143
# File 'lib/sidekiq/throttler/rate_limit.rb', line 141

def exceeded(&block)
  @exceeded = block
end

#exceeded?true, false

Check if rate limit has exceeded the threshold.

Returns:

  • (true, false)


116
117
118
# File 'lib/sidekiq/throttler/rate_limit.rb', line 116

def exceeded?
  count >= threshold
end

#executeObject

Executes a callback (#within_bounds, or #exceeded) depending on the state of the rate limit.



148
149
150
151
152
153
154
155
156
157
# File 'lib/sidekiq/throttler/rate_limit.rb', line 148

def execute
  return @within_bounds.call unless can_throttle?

  if exceeded?
    @exceeded.call(period)
  else
    increment
    @within_bounds.call
  end
end

#executionsObject

Get the storage backend.



167
168
169
# File 'lib/sidekiq/throttler/rate_limit.rb', line 167

def executions
  @storage_class.instance
end

#incrementInteger

Increment the count of jobs executed.

Returns:

  • (Integer)

    The current number of jobs executed.



67
68
69
# File 'lib/sidekiq/throttler/rate_limit.rb', line 67

def increment
  self.class.increment(self)
end

#keyString

Returns The key name used when storing counters for jobs.

Returns:

  • (String)

    The key name used when storing counters for jobs.



96
97
98
99
100
101
102
# File 'lib/sidekiq/throttler/rate_limit.rb', line 96

def key
  @key ||= if options['key']
    options['key'].respond_to?(:call) ? options['key'].call(*payload) : options['key']
  else
    "#{@worker.class.to_s.underscore.gsub('/', ':')}:#{@queue}"
  end
end

#options{String => Float, Integer}

Returns the rate limit options for the current running worker.

Returns:

  • ({String => Float, Integer})


75
76
77
# File 'lib/sidekiq/throttler/rate_limit.rb', line 75

def options
  @options ||= (worker.class.get_sidekiq_options['throttle'] || {}).stringify_keys
end

#periodFloat

Returns The number of seconds in the rate limit period.

Returns:

  • (Float)

    The number of seconds in the rate limit period.



89
90
91
# File 'lib/sidekiq/throttler/rate_limit.rb', line 89

def period
  @period ||= (options['period'].respond_to?(:call) ? options['period'].call(*payload) : options['period']).to_f
end

#reset!Object

Reset the tracking of job executions.



161
162
163
# File 'lib/sidekiq/throttler/rate_limit.rb', line 161

def reset!
  executions.reset
end

#thresholdInteger

Returns The number of jobs that are allowed within the period.

Returns:

  • (Integer)

    The number of jobs that are allowed within the period.



82
83
84
# File 'lib/sidekiq/throttler/rate_limit.rb', line 82

def threshold
  @threshold ||= (options['threshold'].respond_to?(:call) ? options['threshold'].call(*payload) : options['threshold']).to_i
end

#within_bounds(&block) ⇒ Object

Set a callback to be executed when #execute is called and the rate limit has not exceeded the threshold.



131
132
133
# File 'lib/sidekiq/throttler/rate_limit.rb', line 131

def within_bounds(&block)
  @within_bounds = block
end

#within_bounds?true, false

Check if rate limit is within the threshold.

Returns:

  • (true, false)


124
125
126
# File 'lib/sidekiq/throttler/rate_limit.rb', line 124

def within_bounds?
  !exceeded?
end