Class: Sidekiq::Middleware::Server::MaxJobs
- Inherits:
-
Object
- Object
- Sidekiq::Middleware::Server::MaxJobs
- Defined in:
- lib/sidekiq/middleware/server/max_jobs.rb
Constant Summary collapse
- VERSION =
File.read( File.join( File.dirname(__FILE__), '..', '..', '..', '..', 'VERSION' ) ).strip
Class Method Summary collapse
- .cache ⇒ Object
- .counter(queue) ⇒ Object
- .counter_key(queue) ⇒ Object
- .increment_counter!(queue) ⇒ Object
- .log_info(message) ⇒ Object
- .log_initialization! ⇒ Object
- .max_jobs(queue) ⇒ Object
- .max_jobs_jitter(queue) ⇒ Object
- .max_jobs_jitter_key(queue) ⇒ Object
- .max_jobs_key(queue) ⇒ Object
- .max_jobs_with_jitter(queue) ⇒ Object
- .max_jobs_with_jitter_key(queue) ⇒ Object
- .mutex(queue) ⇒ Object
- .mutex_key(queue) ⇒ Object
- .pid ⇒ Object
Instance Method Summary collapse
Class Method Details
.cache ⇒ Object
19 20 21 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 19 def cache @cache ||= {} end |
.counter(queue) ⇒ Object
23 24 25 26 27 28 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 23 def counter(queue) key = counter_key(queue) return cache[key] if cache.include?(key) cache[key] = 0 end |
.counter_key(queue) ⇒ Object
30 31 32 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 30 def counter_key(queue) "COUNTER_#{queue.upcase}" end |
.increment_counter!(queue) ⇒ Object
34 35 36 37 38 39 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 34 def increment_counter!(queue) key = counter_key(queue) counter = cache[key] || 0 cache[key] = counter.next end |
.log_info(message) ⇒ Object
41 42 43 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 41 def log_info() ::Sidekiq.logger.info() if defined?(::Sidekiq.logger) end |
.log_initialization! ⇒ Object
45 46 47 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 45 def log_initialization! log_info("Max-Jobs middleware enabled, shutting down pid: #{pid} after max-jobs threshold reached") end |
.max_jobs(queue) ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 49 def max_jobs(queue) key = max_jobs_key(queue) return cache[key] if cache.include?(key) cache[key] = ( ENV[key] || ENV['MAX_JOBS'] || 100 ).to_i end |
.max_jobs_jitter(queue) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 64 def max_jobs_jitter(queue) key = max_jobs_jitter_key(queue) return cache[key] if cache.include?(key) cache[key] = rand( ( ENV[key] || ENV['MAX_JOBS_JITTER'] || 1 ).to_i ) end |
.max_jobs_jitter_key(queue) ⇒ Object
77 78 79 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 77 def max_jobs_jitter_key(queue) "MAX_JOBS_JITTER_#{queue.upcase}" end |
.max_jobs_key(queue) ⇒ Object
60 61 62 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 60 def max_jobs_key(queue) "MAX_JOBS_#{queue.upcase}" end |
.max_jobs_with_jitter(queue) ⇒ Object
81 82 83 84 85 86 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 81 def max_jobs_with_jitter(queue) key = max_jobs_with_jitter_key(queue) return cache[key] if cache.include?(key) cache[key] = max_jobs(queue) + max_jobs_jitter(queue) end |
.max_jobs_with_jitter_key(queue) ⇒ Object
88 89 90 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 88 def max_jobs_with_jitter_key(queue) "MAX_JOBS_WITH_JITTER_#{queue.upcase}" end |
.mutex(queue) ⇒ Object
92 93 94 95 96 97 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 92 def mutex(queue) key = mutex_key(queue) return cache[key] if cache.include?(key) cache[key] = ::Mutex.new end |
.mutex_key(queue) ⇒ Object
99 100 101 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 99 def mutex_key(queue) "MUTEX_#{queue.upcase}" end |
.pid ⇒ Object
103 104 105 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 103 def pid @pid ||= ::Process.pid end |
Instance Method Details
#call(_, _, queue) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 108 def call( _, # worker-instance _, # item queue ) exception_raised = false begin yield rescue Exception # Set the `exception_raised` boolean to `true` so that the # job-counter *is not* incremented in the `ensure` block exception_raised = true # Re-raise the `Exception` so that _Sidekiq_ can deal w/ it raise ensure if !exception_raised self.class.mutex(queue).synchronize do self.class.increment_counter!(queue) if self.class.counter(queue) == self.class.max_jobs_with_jitter(queue) self.class.log_info("Max-Jobs quota met, shutting down pid: #{self.class.pid}") ::Process.kill('TERM', self.class.pid) end end end end end |