Class: Sidekiq::QueueThrottled::QueueLimiter
- Inherits:
-
Object
- Object
- Sidekiq::QueueThrottled::QueueLimiter
- Defined in:
- lib/sidekiq/queue_throttled/queue_limiter.rb
Instance Attribute Summary collapse
-
#limit ⇒ Object
readonly
Returns the value of attribute limit.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Instance Method Summary collapse
- #acquire_lock?(worker_id = nil) ⇒ Boolean
- #available_slots ⇒ Object
- #current_count ⇒ Object
-
#initialize(queue_name, limit, redis = nil) ⇒ QueueLimiter
constructor
A new instance of QueueLimiter.
- #release_lock(lock_id) ⇒ Object
- #reset! ⇒ Object
Constructor Details
#initialize(queue_name, limit, redis = nil) ⇒ QueueLimiter
Returns a new instance of QueueLimiter.
8 9 10 11 12 13 14 15 |
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 8 def initialize(queue_name, limit, redis = nil) @queue_name = queue_name.to_s @limit = limit.to_i @redis = redis || Sidekiq::QueueThrottled.redis @lock_key = "#{Sidekiq::QueueThrottled.configuration.redis_key_prefix}:queue:#{@queue_name}:lock" @counter_key = "#{Sidekiq::QueueThrottled.configuration.redis_key_prefix}:queue:#{@queue_name}:counter" @mutex = Concurrent::ReentrantReadWriteLock.new end |
Instance Attribute Details
#limit ⇒ Object (readonly)
Returns the value of attribute limit.
6 7 8 |
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 6 def limit @limit end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
6 7 8 |
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 6 def queue_name @queue_name end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
6 7 8 |
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 6 def redis @redis end |
Instance Method Details
#acquire_lock?(worker_id = nil) ⇒ Boolean
17 18 19 20 21 22 23 24 25 26 |
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 17 def acquire_lock?(worker_id = nil) worker_id ||= SecureRandom.uuid lock_id = "#{worker_id}:#{Time.now.to_f}" @mutex.with_write_lock do return false if limit_reached? increment_counter lock_id end end |
#available_slots ⇒ Object
41 42 43 |
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 41 def available_slots [0, @limit - current_count].max end |
#current_count ⇒ Object
37 38 39 |
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 37 def current_count @mutex.with_read_lock { fetch_current_count } end |
#release_lock(lock_id) ⇒ Object
28 29 30 31 32 33 34 35 |
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 28 def release_lock(lock_id) return false unless lock_id @mutex.with_write_lock { true } rescue StandardError => e Sidekiq::QueueThrottled.logger.error "Failed to release lock #{lock_id} for queue #{@queue_name}: #{e.}" false end |
#reset! ⇒ Object
45 46 47 48 49 50 51 52 |
# File 'lib/sidekiq/queue_throttled/queue_limiter.rb', line 45 def reset! @mutex.with_write_lock do @redis.del(@counter_key) pattern = "#{Sidekiq::QueueThrottled.configuration.redis_key_prefix}:queue:#{@queue_name}:lock:*" keys = @redis.keys(pattern) @redis.del(*keys) unless keys.empty? end end |