Class: Sidekiq::ProcessManager::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/process_manager/manager.rb

Overview

Process manager for sidekiq. This class is responsible for starting and monitoring that the specified number of sidekiq processes are running. It will also forward signals sent to the main process to the child processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(process_count: 1, prefork: false, preboot: nil, max_memory: nil, mode: nil, silent: false) ⇒ Manager

Create a new process manager.

Parameters:

  • process_count (Integer) (defaults to: 1)

    The number of sidekiq processes to start.

  • prefork (Boolean) (defaults to: false)

    If true, the process manager will load the application before forking.

  • preboot (String) (defaults to: nil)

    If set, the process manager will require the specified file before forking the child processes.

  • mode (Symbol) (defaults to: nil)

    If set to :testing, the process manager will use a mock CLI.

  • silent (Boolean) (defaults to: false)

    If true, the process manager will not output any messages.

Raises:

  • (ArgumentError)


21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/sidekiq/process_manager/manager.rb', line 21

def initialize(process_count: 1, prefork: false, preboot: nil, max_memory: nil, mode: nil, silent: false)
  require "sidekiq/cli"

  # Get the number of processes to fork
  @process_count = process_count
  raise ArgumentError.new("Process count must be greater than 1") if @process_count < 1

  @prefork = (prefork && process_count > 1)
  @preboot = preboot if process_count > 1 && !prefork
  @max_memory = ((max_memory.to_i > 0) ? max_memory.to_i : nil)

  if mode == :testing
    require_relative "../../../spec/support/mocks"
    @cli = MockSidekiqCLI.new(silent)
    @memory_check_interval = 1
  else
    @cli = Sidekiq::CLI.instance
    @memory_check_interval = 60
  end

  @silent = silent
  @pids = []
  @terminated_pids = []
  @started = false
  @mutex = Mutex.new
end

Instance Attribute Details

#cliObject (readonly)

Returns the value of attribute cli.



12
13
14
# File 'lib/sidekiq/process_manager/manager.rb', line 12

def cli
  @cli
end

Instance Method Details

#pidsArray<Integer>

Get all chile process pids.

Returns:

  • (Array<Integer>)


143
144
145
# File 'lib/sidekiq/process_manager/manager.rb', line 143

def pids
  @mutex.synchronize { @pids.dup }
end

#startvoid

This method returns an undefined value.

Start the process manager. This method will start the specified number of sidekiq processes and monitor them. It will only exit once all child processes have exited. If a child process dies unexpectedly, it will be restarted.

Child processes are manged by sending the signals you would normally send to a sidekiq process to the process manager instead.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/sidekiq/process_manager/manager.rb', line 57

def start
  raise "Process manager already started" if started?
  @started = true

  load_sidekiq

  master_pid = ::Process.pid

  @signal_pipe_read, @signal_pipe_write = IO.pipe

  @signal_thread = Thread.new do
    Thread.current.name = "signal-handler"

    while @signal_pipe_read.wait_readable
      begin
        signal = @signal_pipe_read.gets.strip
        send_signal_to_children(signal.to_sym)
      rescue => e
        log_warning("Error sending signal #{signal} to child processes: #{e.message}")
      end
    end
  end

  # Trap signals that will be forwarded to child processes
  [:INT, :TERM, :USR1, :USR2, :TSTP, :TTIN].each do |signal|
    ::Signal.trap(signal) do
      if ::Process.pid == master_pid
        signal = :TERM if signal == :INT
        @signal_pipe_write.puts(signal)
      end
    end
  end

  # Ensure that child processes receive the term signal when the master process exits.
  at_exit do
    if ::Process.pid == master_pid
      if @process_count > 0
        send_signal_to_children(:TERM)
      end
      wait_for_children_to_exit
      log_info("Process manager exiting")
    end
  end

  GC.start
  GC.compact if GC.respond_to?(:compact)
  # I'm not sure why, but running GC operations blocks until we try to write some I/O.
  File.write("/dev/null", "0")

  @process_count.times do
    start_child_process!
  end

  start_memory_monitor if @max_memory

  log_info("Process manager started")
  monitor_child_processes
end

#started?Boolean

Return true if the process manager has started.

Returns:

  • (Boolean)


150
151
152
# File 'lib/sidekiq/process_manager/manager.rb', line 150

def started?
  @started
end

#stopvoid

This method returns an undefined value.

Helper to gracefully stop all child processes.



133
134
135
136
137
138
# File 'lib/sidekiq/process_manager/manager.rb', line 133

def stop
  stop_memory_monitor
  @process_count = 0
  send_signal_to_children(:TSTP)
  send_signal_to_children(:TERM)
end

#wait(timeout = 5) ⇒ void

This method returns an undefined value.

Helper to wait on the manager to wait on child processes to start up.

Parameters:

  • timeout (Integer) (defaults to: 5)

    The number of seconds to wait for child processes to start.

Raises:

  • (Timeout::Error)


120
121
122
123
124
125
126
127
128
# File 'lib/sidekiq/process_manager/manager.rb', line 120

def wait(timeout = 5)
  timeout_time = monotonic_time + timeout
  while monotonic_time <= timeout_time
    return if @pids.size == @process_count
    sleep(0.01)
  end

  raise Timeout::Error.new("child processes failed to start in #{timeout} seconds")
end