Class: Sidekiq::Throttled::Strategy

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/throttled/strategy.rb,
lib/sidekiq/throttled/strategy/base.rb,
lib/sidekiq/throttled/strategy/threshold.rb,
lib/sidekiq/throttled/strategy/concurrency.rb

Overview

Meta-strategy that couples Concurrency and Threshold strategies.

Defined Under Namespace

Modules: Base Classes: Concurrency, Threshold

Constant Summary collapse

VALID_VALUES_FOR_REQUEUE_WITH =

:enqueue means put the job back at the end of the queue immediately :schedule means schedule enqueueing the job for a later time when we expect to have capacity

%i[enqueue schedule].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil, requeue: nil) ⇒ Strategy

Returns a new instance of Strategy.

Parameters:

  • name (#to_s)
  • concurrency (Hash) (defaults to: nil)

    Concurrency options. See keyword args of Sidekiq::Throttled::Strategy::Concurrency#initialize for details.

  • threshold (Hash) (defaults to: nil)

    Threshold options. See keyword args of Sidekiq::Throttled::Strategy::Threshold#initialize for details.

  • key_suffix (#call) (defaults to: nil)

    Dynamic key suffix generator.

  • observer (#call) (defaults to: nil)

    Process called after throttled.

  • requeue (#call) (defaults to: nil)

    What to do with jobs that are throttled.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/sidekiq/throttled/strategy.rb', line 43

def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil, requeue: nil) # rubocop:disable Metrics/MethodLength, Metrics/ParameterLists
  @observer = observer

  @concurrency = StrategyCollection.new(concurrency,
    strategy:   Concurrency,
    name:       name,
    key_suffix: key_suffix)

  @threshold = StrategyCollection.new(threshold,
    strategy:   Threshold,
    name:       name,
    key_suffix: key_suffix)

  @requeue_options = Throttled.config.default_requeue_options.merge(requeue || {})

  validate!
end

Instance Attribute Details

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



21
22
23
# File 'lib/sidekiq/throttled/strategy.rb', line 21

def concurrency
  @concurrency
end

#observerObject (readonly)

Returns the value of attribute observer.



29
30
31
# File 'lib/sidekiq/throttled/strategy.rb', line 29

def observer
  @observer
end

#requeue_optionsObject (readonly)

Returns the value of attribute requeue_options.



33
34
35
# File 'lib/sidekiq/throttled/strategy.rb', line 33

def requeue_options
  @requeue_options
end

#thresholdObject (readonly)

Returns the value of attribute threshold.



25
26
27
# File 'lib/sidekiq/throttled/strategy.rb', line 25

def threshold
  @threshold
end

Instance Method Details

#dynamic?Boolean

Returns whenever strategy has dynamic config.

Returns:

  • (Boolean)

    whenever strategy has dynamic config



62
63
64
65
66
67
# File 'lib/sidekiq/throttled/strategy.rb', line 62

def dynamic?
  return true if @concurrency&.dynamic?
  return true if @threshold&.dynamic?

  false
end

#finalize!(jid, *job_args) ⇒ void

This method returns an undefined value.

Marks job as being processed.



117
118
119
# File 'lib/sidekiq/throttled/strategy.rb', line 117

def finalize!(jid, *job_args)
  @concurrency&.finalize!(jid, *job_args)
end

#requeue_throttled(work) ⇒ void

This method returns an undefined value.

Return throttled job to be executed later. Implementation depends on the strategy’s ‘requeue` options.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/sidekiq/throttled/strategy.rb', line 98

def requeue_throttled(work) # rubocop:disable Metrics/MethodLength
  # Resolve :with and :to options, calling them if they are Procs
  job_args = JSON.parse(work.job)["args"]
  with = requeue_with.respond_to?(:call) ? requeue_with.call(*job_args) : requeue_with
  target_queue = calc_target_queue(work)

  case with
  when :enqueue
    re_enqueue_throttled(work, target_queue)
  when :schedule
    # Find out when we will next be able to execute this job, and reschedule for then.
    reschedule_throttled(work, target_queue)
  else
    raise "unrecognized :with option #{with}"
  end
end

#requeue_toString?

Returns Name of the queue to re-queue the job to.

Returns:

  • (String, nil)

    Name of the queue to re-queue the job to.



92
93
94
# File 'lib/sidekiq/throttled/strategy.rb', line 92

def requeue_to
  requeue_options[:to]
end

#requeue_withProc, Symbol

Returns How to requeue the throttled job.

Returns:

  • (Proc, Symbol)

    How to requeue the throttled job



87
88
89
# File 'lib/sidekiq/throttled/strategy.rb', line 87

def requeue_with
  requeue_options[:with]
end

#reset!void

This method returns an undefined value.

Resets count of jobs of all available strategies



123
124
125
126
# File 'lib/sidekiq/throttled/strategy.rb', line 123

def reset!
  @concurrency&.reset!
  @threshold&.reset!
end

#throttled?(jid, *job_args) ⇒ Boolean

Returns whenever job is throttled or not.

Returns:

  • (Boolean)

    whenever job is throttled or not.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/sidekiq/throttled/strategy.rb', line 70

def throttled?(jid, *job_args)
  if @concurrency&.throttled?(jid, *job_args)
    @observer&.call(:concurrency, *job_args)
    return true
  end

  if @threshold&.throttled?(*job_args)
    @observer&.call(:threshold, *job_args)

    finalize!(jid, *job_args)
    return true
  end

  false
end