Class: Sidekiq::Throttled::Strategy::Concurrency
- Inherits:
-
Object
- Object
- Sidekiq::Throttled::Strategy::Concurrency
- Includes:
- Base
- Defined in:
- lib/sidekiq/throttled/strategy/concurrency.rb
Overview
Concurrency throttling strategy
Instance Method Summary collapse
-
#count(*job_args) ⇒ Integer
Current count of jobs.
-
#dynamic? ⇒ Boolean
Whenever strategy has dynamic config.
-
#finalize!(jid, *job_args) ⇒ void
Remove jid from the pool of jobs in progress.
-
#initialize(strategy_key, limit:, avg_job_duration: nil, ttl: nil, lost_job_threshold: ttl, key_suffix: nil) ⇒ Concurrency
constructor
deprecated
Deprecated.
@param [#to_i] ttl Obsolete alias for ‘lost_job_threshold`. Default: 900 or 3 * avg_job_duration
-
#reset!(*job_args) ⇒ void
Resets count of jobs.
-
#retry_in(_jid, *job_args) ⇒ Float
How long, in seconds, before we’ll next be able to take on jobs.
-
#throttled?(jid, *job_args) ⇒ Boolean
Whenever job is throttled or not.
Methods included from Base
Constructor Details
#initialize(strategy_key, limit:, avg_job_duration: nil, ttl: nil, lost_job_threshold: ttl, key_suffix: nil) ⇒ Concurrency
@param [#to_i] ttl Obsolete alias for ‘lost_job_threshold`. Default: 900 or 3 * avg_job_duration
Returns a new instance of Concurrency.
36 37 38 39 40 41 42 43 44 |
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 36 def initialize(strategy_key, limit:, avg_job_duration: nil, ttl: nil, lost_job_threshold: ttl, key_suffix: nil) # rubocop:disable Metrics/ParameterLists @base_key = "#{strategy_key}:concurrency.v2" @limit = limit @avg_job_duration, @lost_job_threshold = interp_duration_args(avg_job_duration, lost_job_threshold) @key_suffix = key_suffix raise(ArgumentError, "lost_job_threshold must be greater than avg_job_duration") if @lost_job_threshold <= @avg_job_duration end |
Instance Method Details
#count(*job_args) ⇒ Integer
Returns Current count of jobs.
72 73 74 |
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 72 def count(*job_args) Sidekiq.redis { |conn| conn.zcard(key(job_args)) }.to_i end |
#dynamic? ⇒ Boolean
Returns Whenever strategy has dynamic config.
47 48 49 |
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 47 def dynamic? @key_suffix || @limit.respond_to?(:call) end |
#finalize!(jid, *job_args) ⇒ void
This method returns an undefined value.
Remove jid from the pool of jobs in progress
84 85 86 87 88 |
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 84 def finalize!(jid, *job_args) Sidekiq.redis do |conn| conn.zrem(key(job_args), jid.to_s) end end |
#reset!(*job_args) ⇒ void
This method returns an undefined value.
Resets count of jobs
78 79 80 |
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 78 def reset!(*job_args) Sidekiq.redis { |conn| conn.del(key(job_args)) } end |
#retry_in(_jid, *job_args) ⇒ Float
Returns How long, in seconds, before we’ll next be able to take on jobs.
64 65 66 67 68 69 |
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 64 def retry_in(_jid, *job_args) job_limit = limit(job_args) return 0.0 if !job_limit || count(*job_args) < job_limit estimated_backlog_size(job_args) * @avg_job_duration / limit(job_args) end |
#throttled?(jid, *job_args) ⇒ Boolean
Returns whenever job is throttled or not.
52 53 54 55 56 57 58 59 60 61 |
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 52 def throttled?(jid, *job_args) job_limit = limit(job_args) return false unless job_limit return true if job_limit <= 0 keys = [key(job_args), backlog_info_key(job_args)] argv = [jid.to_s, job_limit, @lost_job_threshold, Time.now.to_f] Sidekiq.redis { |redis| (1 == SCRIPT.call(redis, keys: keys, argv: argv)) } end |