Class: Sidekiq::QueueThrottled::JobThrottler
- Inherits:
-
Object
- Object
- Sidekiq::QueueThrottled::JobThrottler
- Includes:
- RedisKeyManager
- Defined in:
- lib/sidekiq/queue_throttled/job_throttler.rb
Instance Attribute Summary collapse
-
#job_class ⇒ Object
readonly
Returns the value of attribute job_class.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
-
#throttle_config ⇒ Object
readonly
Returns the value of attribute throttle_config.
Instance Method Summary collapse
- #acquire_slot(args) ⇒ Object
- #can_process?(args) ⇒ Boolean
-
#initialize(job_class, throttle_config, redis = nil) ⇒ JobThrottler
constructor
A new instance of JobThrottler.
- #release_slot(args) ⇒ Object
Methods included from RedisKeyManager
#concurrency_key, #rate_key, #resolve_key_suffix
Constructor Details
#initialize(job_class, throttle_config, redis = nil) ⇒ JobThrottler
Returns a new instance of JobThrottler.
34 35 36 37 38 39 |
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 34 def initialize(job_class, throttle_config, redis = nil) @job_class = job_class @throttle_config = throttle_config @redis = redis || Sidekiq::QueueThrottled.redis @mutex = Concurrent::ReentrantReadWriteLock.new end |
Instance Attribute Details
#job_class ⇒ Object (readonly)
Returns the value of attribute job_class.
32 33 34 |
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 32 def job_class @job_class end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
32 33 34 |
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 32 def redis @redis end |
#throttle_config ⇒ Object (readonly)
Returns the value of attribute throttle_config.
32 33 34 |
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 32 def throttle_config @throttle_config end |
Instance Method Details
#acquire_slot(args) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 47 def acquire_slot(args) return true unless @throttle_config @mutex.with_write_lock do return false unless can_process?(args) return acquire_concurrency_slot?(args) if @throttle_config[:concurrency] return acquire_rate_slot?(args) if @throttle_config[:rate] true end end |
#can_process?(args) ⇒ Boolean
41 42 43 44 45 |
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 41 def can_process?(args) return true unless @throttle_config @mutex.with_read_lock { concurrency_allowed?(args) && rate_allowed?(args) } end |
#release_slot(args) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 59 def release_slot(args) return true unless @throttle_config @mutex.with_write_lock do release_concurrency_slot?(args) if @throttle_config[:concurrency] end true rescue StandardError => e Sidekiq::QueueThrottled.logger.error "Failed to release slot for job #{@job_class}: #{e.}" false end |