Class: Sidekiq::Throttled::Strategy
- Inherits:
-
Object
- Object
- Sidekiq::Throttled::Strategy
- Defined in:
- lib/sidekiq/throttled/strategy.rb,
lib/sidekiq/throttled/strategy/base.rb,
lib/sidekiq/throttled/strategy/threshold.rb,
lib/sidekiq/throttled/strategy/concurrency.rb
Overview
Meta-strategy that couples Concurrency and Threshold strategies.
Defined Under Namespace
Modules: Base Classes: Concurrency, Threshold
Constant Summary collapse
- VALID_VALUES_FOR_REQUEUE_WITH =
:enqueue means put the job back at the end of the queue immediately :schedule means schedule enqueueing the job for a later time when we expect to have capacity
%i[enqueue schedule].freeze
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
readonly
Returns the value of attribute concurrency.
-
#observer ⇒ Object
readonly
Returns the value of attribute observer.
-
#requeue_options ⇒ Object
readonly
Returns the value of attribute requeue_options.
-
#threshold ⇒ Object
readonly
Returns the value of attribute threshold.
Instance Method Summary collapse
-
#dynamic? ⇒ Boolean
Whenever strategy has dynamic config.
-
#finalize!(jid, *job_args) ⇒ void
Marks job as being processed.
-
#initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil, requeue: nil) ⇒ Strategy
constructor
A new instance of Strategy.
-
#requeue_throttled(work) ⇒ void
Return throttled job to be executed later.
-
#requeue_to ⇒ String?
Name of the queue to re-queue the job to.
-
#requeue_with ⇒ Proc, Symbol
How to requeue the throttled job.
-
#reset! ⇒ void
Resets count of jobs of all available strategies.
-
#throttled?(jid, *job_args) ⇒ Boolean
Whenever job is throttled or not.
Constructor Details
#initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil, requeue: nil) ⇒ Strategy
Returns a new instance of Strategy.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/sidekiq/throttled/strategy.rb', line 43 def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil, requeue: nil) # rubocop:disable Metrics/MethodLength, Metrics/ParameterLists @observer = observer @concurrency = StrategyCollection.new(concurrency, strategy: Concurrency, name: name, key_suffix: key_suffix) @threshold = StrategyCollection.new(threshold, strategy: Threshold, name: name, key_suffix: key_suffix) @requeue_options = Throttled.config..merge(requeue || {}) validate! end |
Instance Attribute Details
#concurrency ⇒ Object (readonly)
Returns the value of attribute concurrency.
21 22 23 |
# File 'lib/sidekiq/throttled/strategy.rb', line 21 def concurrency @concurrency end |
#observer ⇒ Object (readonly)
Returns the value of attribute observer.
29 30 31 |
# File 'lib/sidekiq/throttled/strategy.rb', line 29 def observer @observer end |
#requeue_options ⇒ Object (readonly)
Returns the value of attribute requeue_options.
33 34 35 |
# File 'lib/sidekiq/throttled/strategy.rb', line 33 def @requeue_options end |
#threshold ⇒ Object (readonly)
Returns the value of attribute threshold.
25 26 27 |
# File 'lib/sidekiq/throttled/strategy.rb', line 25 def threshold @threshold end |
Instance Method Details
#dynamic? ⇒ Boolean
Returns whenever strategy has dynamic config.
62 63 64 65 66 67 |
# File 'lib/sidekiq/throttled/strategy.rb', line 62 def dynamic? return true if @concurrency&.dynamic? return true if @threshold&.dynamic? false end |
#finalize!(jid, *job_args) ⇒ void
This method returns an undefined value.
Marks job as being processed.
117 118 119 |
# File 'lib/sidekiq/throttled/strategy.rb', line 117 def finalize!(jid, *job_args) @concurrency&.finalize!(jid, *job_args) end |
#requeue_throttled(work) ⇒ void
This method returns an undefined value.
Return throttled job to be executed later. Implementation depends on the strategy’s ‘requeue` options.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/sidekiq/throttled/strategy.rb', line 98 def requeue_throttled(work) # rubocop:disable Metrics/MethodLength # Resolve :with and :to options, calling them if they are Procs job_args = JSON.parse(work.job)["args"] with = requeue_with.respond_to?(:call) ? requeue_with.call(*job_args) : requeue_with target_queue = calc_target_queue(work) case with when :enqueue re_enqueue_throttled(work, target_queue) when :schedule # Find out when we will next be able to execute this job, and reschedule for then. reschedule_throttled(work, target_queue) else raise "unrecognized :with option #{with}" end end |
#requeue_to ⇒ String?
Returns Name of the queue to re-queue the job to.
92 93 94 |
# File 'lib/sidekiq/throttled/strategy.rb', line 92 def requeue_to [:to] end |
#requeue_with ⇒ Proc, Symbol
Returns How to requeue the throttled job.
87 88 89 |
# File 'lib/sidekiq/throttled/strategy.rb', line 87 def requeue_with [:with] end |
#reset! ⇒ void
This method returns an undefined value.
Resets count of jobs of all available strategies
123 124 125 126 |
# File 'lib/sidekiq/throttled/strategy.rb', line 123 def reset! @concurrency&.reset! @threshold&.reset! end |
#throttled?(jid, *job_args) ⇒ Boolean
Returns whenever job is throttled or not.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/sidekiq/throttled/strategy.rb', line 70 def throttled?(jid, *job_args) if @concurrency&.throttled?(jid, *job_args) @observer&.call(:concurrency, *job_args) return true end if @threshold&.throttled?(*job_args) @observer&.call(:threshold, *job_args) finalize!(jid, *job_args) return true end false end |