Class: Sidekiq::QueueThrottled::JobThrottler

Inherits:
Object
  • Object
show all
Includes:
RedisKeyManager
Defined in:
lib/sidekiq/queue_throttled/job_throttler.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from RedisKeyManager

#concurrency_key, #rate_key, #resolve_key_suffix

Constructor Details

#initialize(job_class, throttle_config, redis = nil) ⇒ JobThrottler

Returns a new instance of JobThrottler.



34
35
36
37
38
39
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 34

def initialize(job_class, throttle_config, redis = nil)
  @job_class = job_class
  @throttle_config = throttle_config
  @redis = redis || Sidekiq::QueueThrottled.redis
  @mutex = Concurrent::ReentrantReadWriteLock.new
end

Instance Attribute Details

#job_classObject (readonly)

Returns the value of attribute job_class.



32
33
34
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 32

def job_class
  @job_class
end

#redisObject (readonly)

Returns the value of attribute redis.



32
33
34
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 32

def redis
  @redis
end

#throttle_configObject (readonly)

Returns the value of attribute throttle_config.



32
33
34
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 32

def throttle_config
  @throttle_config
end

Instance Method Details

#acquire_slot(args) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 47

def acquire_slot(args)
  return true unless @throttle_config

  @mutex.with_write_lock do
    return false unless can_process?(args)
    return acquire_concurrency_slot?(args) if @throttle_config[:concurrency]
    return acquire_rate_slot?(args) if @throttle_config[:rate]

    true
  end
end

#can_process?(args) ⇒ Boolean

Returns:

  • (Boolean)


41
42
43
44
45
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 41

def can_process?(args)
  return true unless @throttle_config

  @mutex.with_read_lock { concurrency_allowed?(args) && rate_allowed?(args) }
end

#release_slot(args) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
# File 'lib/sidekiq/queue_throttled/job_throttler.rb', line 59

def release_slot(args)
  return true unless @throttle_config

  @mutex.with_write_lock do
    release_concurrency_slot?(args) if @throttle_config[:concurrency]
  end
  true
rescue StandardError => e
  Sidekiq::QueueThrottled.logger.error "Failed to release slot for job #{@job_class}: #{e.message}"
  false
end