Class: Sidekiq::Throttled::Strategy::Concurrency

Inherits:
Object
  • Object
show all
Includes:
Base
Defined in:
lib/sidekiq/throttled/strategy/concurrency.rb

Overview

Concurrency throttling strategy

Instance Method Summary collapse

Methods included from Base

#limit

Constructor Details

#initialize(strategy_key, limit:, avg_job_duration: nil, ttl: nil, lost_job_threshold: ttl, key_suffix: nil) ⇒ Concurrency

Deprecated.

@param [#to_i] ttl Obsolete alias for ‘lost_job_threshold`. Default: 900 or 3 * avg_job_duration

Returns a new instance of Concurrency.

Parameters:

  • strategy_key (#to_s)
  • limit (#to_i, #call)

    Amount of allowed concurrent jobs per processors running for given key.

  • avg_job_duration (#to_i) (defaults to: nil)

    Average number of seconds needed to complete a job of this type. Default: 300 or 1/3 of lost_job_threshold

  • lost_job_threshold (#to_i) (defaults to: ttl)

    Seconds to wait before considering a job lost or dead. Default: 900 or 3 * avg_job_duration

  • key_suffix (Proc) (defaults to: nil)

    Dynamic key suffix generator.

Raises:

  • (ArgumentError)


36
37
38
39
40
41
42
43
44
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 36

def initialize(strategy_key, limit:, avg_job_duration: nil, ttl: nil, lost_job_threshold: ttl, key_suffix: nil) # rubocop:disable Metrics/ParameterLists
  @base_key = "#{strategy_key}:concurrency.v2"
  @limit = limit
  @avg_job_duration, @lost_job_threshold = interp_duration_args(avg_job_duration, lost_job_threshold)
  @key_suffix = key_suffix

  raise(ArgumentError, "lost_job_threshold must be greater than avg_job_duration") if
    @lost_job_threshold <= @avg_job_duration
end

Instance Method Details

#count(*job_args) ⇒ Integer

Returns Current count of jobs.

Returns:

  • (Integer)

    Current count of jobs



72
73
74
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 72

def count(*job_args)
  Sidekiq.redis { |conn| conn.zcard(key(job_args)) }.to_i
end

#dynamic?Boolean

Returns Whenever strategy has dynamic config.

Returns:

  • (Boolean)

    Whenever strategy has dynamic config



47
48
49
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 47

def dynamic?
  @key_suffix || @limit.respond_to?(:call)
end

#finalize!(jid, *job_args) ⇒ void

This method returns an undefined value.

Remove jid from the pool of jobs in progress



84
85
86
87
88
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 84

def finalize!(jid, *job_args)
  Sidekiq.redis do |conn|
    conn.zrem(key(job_args), jid.to_s)
  end
end

#reset!(*job_args) ⇒ void

This method returns an undefined value.

Resets count of jobs



78
79
80
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 78

def reset!(*job_args)
  Sidekiq.redis { |conn| conn.del(key(job_args)) }
end

#retry_in(_jid, *job_args) ⇒ Float

Returns How long, in seconds, before we’ll next be able to take on jobs.

Returns:

  • (Float)

    How long, in seconds, before we’ll next be able to take on jobs



64
65
66
67
68
69
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 64

def retry_in(_jid, *job_args)
  job_limit = limit(job_args)
  return 0.0 if !job_limit || count(*job_args) < job_limit

  estimated_backlog_size(job_args) * @avg_job_duration / limit(job_args)
end

#throttled?(jid, *job_args) ⇒ Boolean

Returns whenever job is throttled or not.

Returns:

  • (Boolean)

    whenever job is throttled or not



52
53
54
55
56
57
58
59
60
61
# File 'lib/sidekiq/throttled/strategy/concurrency.rb', line 52

def throttled?(jid, *job_args)
  job_limit = limit(job_args)
  return false unless job_limit
  return true if job_limit <= 0

  keys = [key(job_args), backlog_info_key(job_args)]
  argv = [jid.to_s, job_limit, @lost_job_threshold, Time.now.to_f]

  Sidekiq.redis { |redis| (1 == SCRIPT.call(redis, keys: keys, argv: argv)) }
end