Class: Sidekiq::QueueThrottled::QueueLimiter

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/queue_throttled/queue_limiter.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, limit, redis = nil) ⇒ QueueLimiter

Returns a new instance of QueueLimiter.



8
9
10
11
12
13
14
15
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 8

def initialize(queue_name, limit, redis = nil)
  @queue_name = queue_name.to_s
  @limit = limit.to_i
  @redis = redis || Sidekiq::QueueThrottled.redis
  @lock_key = "#{Sidekiq::QueueThrottled.configuration.redis_key_prefix}:queue:#{@queue_name}:lock"
  @counter_key = "#{Sidekiq::QueueThrottled.configuration.redis_key_prefix}:queue:#{@queue_name}:counter"
  @mutex = Concurrent::ReentrantReadWriteLock.new
end

Instance Attribute Details

#limitObject (readonly)

Returns the value of attribute limit.



6
7
8
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 6

def limit
  @limit
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



6
7
8
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 6

def queue_name
  @queue_name
end

#redisObject (readonly)

Returns the value of attribute redis.



6
7
8
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 6

def redis
  @redis
end

Instance Method Details

#acquire_lock?(worker_id = nil) ⇒ Boolean

Returns:

  • (Boolean)


17
18
19
20
21
22
23
24
25
26
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 17

def acquire_lock?(worker_id = nil)
  worker_id ||= SecureRandom.uuid
  lock_id = "#{worker_id}:#{Time.now.to_f}"
  @mutex.with_write_lock do
    return false if limit_reached?

    increment_counter
    lock_id
  end
end

#available_slotsObject



41
42
43
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 41

def available_slots
  [0, @limit - current_count].max
end

#current_countObject



37
38
39
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 37

def current_count
  @mutex.with_read_lock { fetch_current_count }
end

#release_lock(lock_id) ⇒ Object



28
29
30
31
32
33
34
35
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 28

def release_lock(lock_id)
  return false unless lock_id

  @mutex.with_write_lock { true }
rescue StandardError => e
  Sidekiq::QueueThrottled.logger.error "Failed to release lock #{lock_id} for queue #{@queue_name}: #{e.message}"
  false
end

#reset!Object



45
46
47
48
49
50
51
52
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 45

def reset!
  @mutex.with_write_lock do
    @redis.del(@counter_key)
    pattern = "#{Sidekiq::QueueThrottled.configuration.redis_key_prefix}:queue:#{@queue_name}:lock:*"
    keys = @redis.keys(pattern)
    @redis.del(*keys) unless keys.empty?
  end
end