Class: Sidekiq::Middleware::Server::MaxJobs

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/middleware/server/max_jobs.rb

Constant Summary collapse

VERSION =
File.read(
  File.join(
    File.dirname(__FILE__),
    '..',
    '..',
    '..',
    '..',
    'VERSION'
  )
).strip

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.cacheObject



19
20
21
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 19

def cache
  @cache ||= {}
end

.counter(queue) ⇒ Object



23
24
25
26
27
28
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 23

def counter(queue)
  key = counter_key(queue)
  return cache[key] if cache.include?(key)

  cache[key] = 0
end

.counter_key(queue) ⇒ Object



30
31
32
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 30

def counter_key(queue)
  "COUNTER_#{queue.upcase}"
end

.increment_counter!(queue) ⇒ Object



34
35
36
37
38
39
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 34

def increment_counter!(queue)
  key = counter_key(queue)
  counter = cache[key] || 0

  cache[key] = counter.next
end

.log_info(message) ⇒ Object



41
42
43
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 41

def log_info(message)
  ::Sidekiq.logger.info(message) if defined?(::Sidekiq.logger)
end

.log_initialization!Object



45
46
47
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 45

def log_initialization!
  log_info("Max-Jobs middleware enabled, shutting down pid: #{pid} after max-jobs threshold reached")
end

.max_jobs(queue) ⇒ Object



49
50
51
52
53
54
55
56
57
58
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 49

def max_jobs(queue)
  key = max_jobs_key(queue)
  return cache[key] if cache.include?(key)

  cache[key] = (
    ENV[key] ||
    ENV['MAX_JOBS'] ||
    100
  ).to_i
end

.max_jobs_jitter(queue) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 64

def max_jobs_jitter(queue)
  key = max_jobs_jitter_key(queue)
  return cache[key] if cache.include?(key)

  cache[key] = rand(
    (
      ENV[key] ||
      ENV['MAX_JOBS_JITTER'] ||
      1
    ).to_i
  )
end

.max_jobs_jitter_key(queue) ⇒ Object



77
78
79
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 77

def max_jobs_jitter_key(queue)
  "MAX_JOBS_JITTER_#{queue.upcase}"
end

.max_jobs_key(queue) ⇒ Object



60
61
62
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 60

def max_jobs_key(queue)
  "MAX_JOBS_#{queue.upcase}"
end

.max_jobs_with_jitter(queue) ⇒ Object



81
82
83
84
85
86
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 81

def max_jobs_with_jitter(queue)
  key = max_jobs_with_jitter_key(queue)
  return cache[key] if cache.include?(key)

  cache[key] = max_jobs(queue) + max_jobs_jitter(queue)
end

.max_jobs_with_jitter_key(queue) ⇒ Object



88
89
90
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 88

def max_jobs_with_jitter_key(queue)
  "MAX_JOBS_WITH_JITTER_#{queue.upcase}"
end

.mutex(queue) ⇒ Object



92
93
94
95
96
97
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 92

def mutex(queue)
  key = mutex_key(queue)
  return cache[key] if cache.include?(key)

  cache[key] = ::Mutex.new
end

.mutex_key(queue) ⇒ Object



99
100
101
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 99

def mutex_key(queue)
  "MUTEX_#{queue.upcase}"
end

.pidObject



103
104
105
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 103

def pid
  @pid ||= ::Process.pid
end

Instance Method Details

#call(_, _, queue) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 108

def call(
  _,     # worker-instance
  _,     # item
  queue
)
  exception_raised = false
  begin
    yield
  rescue Exception
    # Set the `exception_raised` boolean to `true` so that the
    # job-counter *is not* incremented in the `ensure` block
    exception_raised = true
    # Re-raise the `Exception` so that _Sidekiq_ can deal w/ it
    raise
  ensure
    if !exception_raised
      self.class.mutex(queue).synchronize do
        self.class.increment_counter!(queue)

        if self.class.counter(queue) == self.class.max_jobs_with_jitter(queue)
          self.class.log_info("Max-Jobs quota met, shutting down pid: #{self.class.pid}")
          ::Process.kill('TERM', self.class.pid)
        end
      end
    end
  end
end