Class: Sidekiq::Throttled::Strategy::Concurrency

Inherits:
Object
  • Object
show all
Includes:
Base
Defined in:
lib/sidekiq/throttled/strategy/concurrency.rb

Overview

Concurrency throttling strategy

Instance Method Summary collapse

Methods included from Base

#limit

Constructor Details

#initialize(strategy_key, limit:, avg_job_duration: nil, ttl: nil, lost_job_threshold: ttl, key_suffix: nil, max_delay: nil) ⇒ Concurrency

Deprecated.

@param [#to_i] ttl Obsolete alias for ‘lost_job_threshold`. Default: 900 or 3 * avg_job_duration

Returns a new instance of Concurrency.

Parameters:

  • strategy_key (#to_s)
  • limit (#to_i, #call)

    Amount of allowed concurrent jobs per processors running for given key.

  • avg_job_duration (#to_i) (defaults to: nil)

    Average number of seconds needed to complete a job of this type. Default: 300 or 1/3 of lost_job_threshold

  • lost_job_threshold (#to_i) (defaults to: ttl)

    Seconds to wait before considering a job lost or dead. Default: 900 or 3 * avg_job_duration

  • key_suffix (Proc) (defaults to: nil)

    Dynamic key suffix generator.

  • max_delay (#to_i) (defaults to: nil)

    Maximum number of seconds to delay a job when it throttled. This prevents jobs from being schedule very far in the future when the backlog is large. Default: the smaller of 30 minutes or 10 * avg_job_duration

Raises:

  • (ArgumentError)


39
40
41
42
43
44
45
46
47
48
49
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 39

def initialize(strategy_key, limit:, avg_job_duration: nil, ttl: nil, # rubocop:disable Metrics/ParameterLists
               lost_job_threshold: ttl, key_suffix: nil, max_delay: nil)
  @base_key = "#{strategy_key}:concurrency.v2"
  @limit = limit
  @avg_job_duration, @lost_job_threshold = interp_duration_args(avg_job_duration, lost_job_threshold)
  @key_suffix = key_suffix
  @max_delay = max_delay || [(10 * @avg_job_duration), 1_800].min

  raise(ArgumentError, "lost_job_threshold must be greater than avg_job_duration") if
    @lost_job_threshold <= @avg_job_duration
end

Instance Method Details

#count(*job_args) ⇒ Integer

Returns Current count of jobs.

Returns:

  • (Integer)

    Current count of jobs



78
79
80
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 78

def count(*job_args)
  Sidekiq.redis { |conn| conn.zcard(key(job_args)) }.to_i
end

#dynamic?Boolean

Returns Whenever strategy has dynamic config.

Returns:

  • (Boolean)

    Whenever strategy has dynamic config



52
53
54
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 52

def dynamic?
  @key_suffix || @limit.respond_to?(:call)
end

#finalize!(jid, *job_args) ⇒ void

This method returns an undefined value.

Remove jid from the pool of jobs in progress



90
91
92
93
94
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 90

def finalize!(jid, *job_args)
  Sidekiq.redis do |conn|
    conn.zrem(key(job_args), jid.to_s)
  end
end

#reset!(*job_args) ⇒ void

This method returns an undefined value.

Resets count of jobs



84
85
86
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 84

def reset!(*job_args)
  Sidekiq.redis { |conn| conn.del(key(job_args)) }
end

#retry_in(_jid, *job_args) ⇒ Float

Returns How long, in seconds, before we’ll next be able to take on jobs.

Returns:

  • (Float)

    How long, in seconds, before we’ll next be able to take on jobs



69
70
71
72
73
74
75
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 69

def retry_in(_jid, *job_args)
  job_limit = limit(job_args)
  return 0.0 if !job_limit || count(*job_args) < job_limit

  (estimated_backlog_size(job_args) * @avg_job_duration / limit(job_args))
    .then { |delay_sec| @max_delay * (1 - Math.exp(-delay_sec / @max_delay)) } # limit to max_delay
end

#throttled?(jid, *job_args) ⇒ Boolean

Returns whenever job is throttled or not.

Returns:

  • (Boolean)

    whenever job is throttled or not



57
58
59
60
61
62
63
64
65
66
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 57

def throttled?(jid, *job_args)
  job_limit = limit(job_args)
  return false unless job_limit
  return true if job_limit <= 0

  keys = [key(job_args), backlog_info_key(job_args)]
  argv = [jid.to_s, job_limit, @lost_job_threshold, Time.now.to_f]

  Sidekiq.redis { |redis| 1 == SCRIPT.call(redis, keys: keys, argv: argv) }
end