Module: Resque::Plugins::Throttler

Extended by:
Throttler
Included in:
Throttler
Defined in:
lib/resque/throttler.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.extended(other) ⇒ Object



8
9
10
# File 'lib/resque/throttler.rb', line 8

def self.extended(other)
  other.instance_variable_set(:@rate_limits, {})
end

Instance Method Details

#gc_rate_limit_data_for_queue(queue) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/resque/throttler.rb', line 45

def gc_rate_limit_data_for_queue(queue)
  return unless queue_rate_limited?(queue)

  limit = rate_limit_for(queue)
  queue_key = "throttler:#{queue}_uuids"
  uuids = redis.smembers(queue_key)

  uuids.each do |uuid|
    job_ended_at = redis.hmget("throttler:jobs:#{uuid}", "ended_at")[0]
    if job_ended_at && Time.at(job_ended_at.to_i) < Time.now - limit[:per]
      redis.srem(queue_key, uuid)
      redis.del("throttler:jobs:#{uuid}")
    end
  end
end

#pop(queue) ⇒ Object



12
13
14
15
16
17
18
19
# File 'lib/resque/throttler.rb', line 12

def pop(queue)
  if queue_at_or_over_rate_limit?(queue)
    gc_rate_limit_data_for_queue(queue)
    nil
  else
    super
  end
end

#queue_at_or_over_rate_limit?(queue) ⇒ Boolean

Returns:

  • (Boolean)


37
38
39
40
41
42
43
# File 'lib/resque/throttler.rb', line 37

def queue_at_or_over_rate_limit?(queue)
  if queue_rate_limited?(queue)
    redis.scard("throttler:#{queue}_uuids") >= rate_limit_for(queue)[:at]
  else
    false
  end
end

#queue_rate_limited?(queue) ⇒ Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/resque/throttler.rb', line 33

def queue_rate_limited?(queue)
  @rate_limits[queue.to_s]
end

#rate_limit(queue, options = {}) ⇒ Object



21
22
23
24
25
26
27
# File 'lib/resque/throttler.rb', line 21

def rate_limit(queue, options={})
  if options.keys.sort != [:at, :per]
    raise ArgumentError.new("Mising either :at or :per in options") 
  end

  @rate_limits[queue.to_s] = options
end

#rate_limit_for(queue) ⇒ Object



29
30
31
# File 'lib/resque/throttler.rb', line 29

def rate_limit_for(queue)
  @rate_limits[queue.to_s]
end