Class: Sidekiq::WorkerKiller

Inherits:
Object
  • Object
show all
Includes:
SidekiqComponent
Defined in:
lib/sidekiq/worker_killer.rb,
lib/sidekiq/worker_killer/version.rb

Overview

Sidekiq server middleware. Kill worker when the RSS memory exceeds limit after a given grace time.

Constant Summary collapse

VERSION =
"1.1.0".freeze
MUTEX =
Mutex.new

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ WorkerKiller

Returns a new instance of WorkerKiller.

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • max_rss (Integer)

    Max RSS in MB. Above this, shutdown will be triggered. (default: ‘0` (disabled))

  • grace_time (Integer)

    When shutdown is triggered, the Sidekiq process will not accept new job and wait at most 15 minutes for running jobs to finish. If Float::INFINITY is specified, will wait forever. (default: ‘900`)

  • shutdown_wait (Integer)

    when the grace time expires, still running jobs get 30 seconds to stop. After that, kill signal is triggered. (default: ‘30`)

  • kill_signal (String)

    Signal to use to kill Sidekiq process if it doesn’t stop. (default: ‘“SIGKILL”`)

  • gc (Boolean)

    Try to run garbage collection before Sidekiq process stops in case of exceeded max_rss. (default: ‘true`)

  • skip_shutdown_if (Proc)

    Executes a block of code after max_rss exceeds but before requesting shutdown. (default: ‘proc false`)

  • on_shutdown (Proc)

    Executes a block of code right before a shutdown happens. (default: ‘nil`)



42
43
44
45
46
47
48
49
50
# File 'lib/sidekiq/worker_killer.rb', line 42

def initialize(options = {})
  @max_rss         = options.fetch(:max_rss, 0)
  @grace_time      = options.fetch(:grace_time, 15 * 60)
  @shutdown_wait   = options.fetch(:shutdown_wait, 30)
  @kill_signal     = options.fetch(:kill_signal, "SIGKILL")
  @gc              = options.fetch(:gc, true)
  @skip_shutdown   = options.fetch(:skip_shutdown_if, proc { false })
  @on_shutdown     = options.fetch(:on_shutdown, nil)
end

Instance Method Details

#call(worker, job, queue) { ... } ⇒ Object

Parameters:

  • worker_class (String, Class)

    the string or class of the worker class being enqueued

  • job (Hash)

    the full job payload @see github.com/mperham/sidekiq/wiki/Job-Format

  • queue (String)

    the name of the queue the job was pulled from

Yields:

  • the next middleware in the chain or the enqueuing of the job



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/sidekiq/worker_killer.rb', line 60

def call(worker, job, queue)
  yield
  # Skip if the max RSS is not exceeded
  return unless @max_rss > 0
  return unless current_rss > @max_rss
  GC.start(full_mark: true, immediate_sweep: true) if @gc
  return unless current_rss > @max_rss
  if skip_shutdown?(worker, job, queue)
    warn "current RSS #{current_rss} exceeds maximum RSS #{@max_rss}, " \
         "however shutdown will be ignored"
    return
  end

  warn "current RSS #{current_rss} of #{identity} exceeds " \
       "maximum RSS #{@max_rss}"

  run_shutdown_hook(worker, job, queue)
  request_shutdown
end

#identityObject



144
145
146
# File 'lib/sidekiq/worker_killer.rb', line 144

def identity
  config[:identity] || config["identity"]
end