Class: Sidekiq::Middleware::Server::MaxJobs
- Inherits:
-
Object
- Object
- Sidekiq::Middleware::Server::MaxJobs
- Defined in:
- lib/sidekiq/middleware/server/max_jobs.rb
Constant Summary collapse
- VERSION =
File.read( File.join( File.dirname(__FILE__), '..', '..', '..', '..', 'VERSION' ) ).strip
Class Method Summary collapse
- .cache ⇒ Object
- .counter ⇒ Object
- .counter_for_queue(queue) ⇒ Object
- .counter_for_queue_key(queue) ⇒ Object
- .counter_key ⇒ Object
- .default_max_jobs ⇒ Object
- .default_max_jobs_jitter ⇒ Object
- .increment_counter! ⇒ Object
- .increment_counter_for_queue!(queue) ⇒ Object
- .log_info(message) ⇒ Object
- .log_initialization! ⇒ Object
- .max_jobs ⇒ Object
- .max_jobs_for_queue(queue) ⇒ Object
- .max_jobs_for_queue_key(queue) ⇒ Object
- .max_jobs_jitter ⇒ Object
- .max_jobs_jitter_for_queue(queue) ⇒ Object
- .max_jobs_jitter_for_queue_key(queue) ⇒ Object
- .max_jobs_jitter_key ⇒ Object
- .max_jobs_key ⇒ Object
- .max_jobs_with_jitter ⇒ Object
- .max_jobs_with_jitter_for_queue(queue) ⇒ Object
- .max_jobs_with_jitter_for_queue_key(queue) ⇒ Object
- .max_jobs_with_jitter_key ⇒ Object
- .mutex ⇒ Object
- .mutex_key ⇒ Object
- .pid ⇒ Object
- .pid_key ⇒ Object
- .quota_met? ⇒ Boolean
- .quota_met_for_queue?(queue) ⇒ Boolean
Instance Method Summary collapse
Class Method Details
.cache ⇒ Object
19 20 21 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 19 def cache @cache ||= {} end |
.counter ⇒ Object
23 24 25 26 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 23 def counter key = counter_key cache[key] ||= 0 end |
.counter_for_queue(queue) ⇒ Object
28 29 30 31 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 28 def counter_for_queue(queue) key = counter_for_queue_key(queue) cache[key] ||= 0 end |
.counter_for_queue_key(queue) ⇒ Object
33 34 35 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 33 def counter_for_queue_key(queue) "COUNTER_#{queue.upcase}" end |
.counter_key ⇒ Object
37 38 39 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 37 def counter_key 'COUNTER' end |
.default_max_jobs ⇒ Object
41 42 43 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 41 def default_max_jobs 100 end |
.default_max_jobs_jitter ⇒ Object
45 46 47 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 45 def default_max_jobs_jitter 1 end |
.increment_counter! ⇒ Object
49 50 51 52 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 49 def increment_counter! key = counter_key cache[key] = (cache[key] || 0).next end |
.increment_counter_for_queue!(queue) ⇒ Object
54 55 56 57 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 54 def increment_counter_for_queue!(queue) key = counter_for_queue_key(queue) cache[key] = (cache[key] || 0).next end |
.log_info(message) ⇒ Object
59 60 61 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 59 def log_info() ::Sidekiq.logger.info() if defined?(::Sidekiq.logger) end |
.log_initialization! ⇒ Object
63 64 65 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 63 def log_initialization! log_info("Max-Jobs middleware enabled, shutting down pid: #{pid} when max-jobs quota is reached") end |
.max_jobs ⇒ Object
67 68 69 70 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 67 def max_jobs key = max_jobs_key cache[key] ||= (ENV[key] || default_max_jobs).to_i end |
.max_jobs_for_queue(queue) ⇒ Object
72 73 74 75 76 77 78 79 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 72 def max_jobs_for_queue(queue) key = max_jobs_for_queue_key(queue) cache[key] ||= ( ENV[key] || ENV[max_jobs_key] || default_max_jobs ).to_i end |
.max_jobs_for_queue_key(queue) ⇒ Object
81 82 83 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 81 def max_jobs_for_queue_key(queue) "MAX_JOBS_#{queue.upcase}" end |
.max_jobs_jitter ⇒ Object
85 86 87 88 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 85 def max_jobs_jitter key = max_jobs_jitter_key cache[key] ||= rand((ENV[key] || default_max_jobs_jitter).to_i) end |
.max_jobs_jitter_for_queue(queue) ⇒ Object
90 91 92 93 94 95 96 97 98 99 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 90 def max_jobs_jitter_for_queue(queue) key = max_jobs_jitter_for_queue_key(queue) cache[key] ||= rand( ( ENV[key] || ENV[max_jobs_jitter_key] || default_max_jobs_jitter ).to_i ) end |
.max_jobs_jitter_for_queue_key(queue) ⇒ Object
101 102 103 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 101 def max_jobs_jitter_for_queue_key(queue) "MAX_JOBS_JITTER_#{queue.upcase}" end |
.max_jobs_jitter_key ⇒ Object
105 106 107 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 105 def max_jobs_jitter_key 'MAX_JOBS_JITTER' end |
.max_jobs_key ⇒ Object
109 110 111 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 109 def max_jobs_key 'MAX_JOBS' end |
.max_jobs_with_jitter ⇒ Object
113 114 115 116 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 113 def max_jobs_with_jitter key = max_jobs_with_jitter_key cache[key] ||= (max_jobs + max_jobs_jitter) end |
.max_jobs_with_jitter_for_queue(queue) ⇒ Object
118 119 120 121 122 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 118 def max_jobs_with_jitter_for_queue(queue) key = max_jobs_with_jitter_for_queue_key(queue) cache[key] ||= \ (max_jobs_for_queue(queue) + max_jobs_jitter_for_queue(queue)) end |
.max_jobs_with_jitter_for_queue_key(queue) ⇒ Object
124 125 126 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 124 def max_jobs_with_jitter_for_queue_key(queue) "MAX_JOBS_WITH_JITTER_#{queue.upcase}" end |
.max_jobs_with_jitter_key ⇒ Object
128 129 130 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 128 def max_jobs_with_jitter_key 'MAX_JOBS_WITH_JITTER' end |
.mutex ⇒ Object
132 133 134 135 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 132 def mutex key = mutex_key cache[key] ||= ::Mutex.new end |
.mutex_key ⇒ Object
137 138 139 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 137 def mutex_key 'MUTEX' end |
.pid ⇒ Object
141 142 143 144 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 141 def pid key = pid_key cache[key] ||= ::Process.pid end |
.pid_key ⇒ Object
146 147 148 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 146 def pid_key 'PID' end |
.quota_met? ⇒ Boolean
150 151 152 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 150 def quota_met? counter == max_jobs_with_jitter end |
.quota_met_for_queue?(queue) ⇒ Boolean
154 155 156 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 154 def quota_met_for_queue?(queue) counter_for_queue(queue) == max_jobs_with_jitter_for_queue(queue) end |
Instance Method Details
#call(_, _, queue) ⇒ Object
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 159 def call( _, # worker-instance _, # item queue ) exception_raised = false begin yield rescue Exception # Set the `exception_raised` boolean to `true` so that the # job-counter *is not* incremented in the `ensure` block exception_raised = true # Re-raise the `Exception` so that _Sidekiq_ can deal w/ it raise ensure if !exception_raised self.class.mutex.synchronize do terminate = false # Increment the total counter self.class.increment_counter! # First check if the total quota has been met if self.class.quota_met? self.class.log_info("Max-Jobs quota met, shutting down pid: #{self.class.pid}") terminate = true end # Increment the queue specific counter self.class.increment_counter_for_queue!(queue) # Now check if the queue specific quota has been met if !terminate && self.class.quota_met_for_queue?(queue) self.class.log_info(%(Max-Jobs quota met for queue: "#{queue}", shutting down pid: #{self.class.pid})) terminate = true end # If applicable, TERMinate the `Process` ::Process.kill('TERM', self.class.pid) if terminate end end end end |