Class: Sidekiq::ProcessManager::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/process_manager/manager.rb

Overview

Process manager for sidekiq. This class is responsible for starting and monitoring that the specified number of sidekiq processes are running. It will also forward signals sent to the main process to the child processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(process_count: 1, prefork: false, preboot: nil, max_memory: nil, mode: nil, silent: false) ⇒ Manager

Create a new process manager.

Parameters:

  • process_count (Integer) (defaults to: 1)

    The number of sidekiq processes to start.

  • prefork (Boolean) (defaults to: false)

    If true, the process manager will load the application before forking.

  • preboot (String) (defaults to: nil)

    If set, the process manager will require the specified file before forking the child processes.

  • mode (Symbol) (defaults to: nil)

    If set to :testing, the process manager will use a mock CLI.

  • silent (Boolean) (defaults to: false)

    If true, the process manager will not output any messages.

Raises:

  • (ArgumentError)


21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/sidekiq/process_manager/manager.rb', line 21

def initialize(process_count: 1, prefork: false, preboot: nil, max_memory: nil, mode: nil, silent: false)
  require "sidekiq/cli"

  # Get the number of processes to fork
  @process_count = process_count
  raise ArgumentError.new("Process count must be greater than 1") if @process_count < 1

  @prefork = (prefork && process_count > 1)
  @preboot = preboot if process_count > 1 && !prefork
  @max_memory = ((max_memory.to_i > 0) ? max_memory.to_i : nil)

  if mode == :testing
    require_relative "../../../spec/support/mocks"
    @cli = MockSidekiqCLI.new(silent)
    @memory_check_interval = 1
  else
    @cli = Sidekiq::CLI.instance
    @memory_check_interval = 60
  end

  @silent = silent
  @pids = []
  @terminated_pids = []
  @started = false
  @mutex = Mutex.new
end

Instance Attribute Details

#cliObject (readonly)

Returns the value of attribute cli.



12
13
14
# File 'lib/sidekiq/process_manager/manager.rb', line 12

def cli
  @cli
end

Instance Method Details

#pidsArray<Integer>

Get all chile process pids.

Returns:

  • (Array<Integer>)


139
140
141
# File 'lib/sidekiq/process_manager/manager.rb', line 139

def pids
  @mutex.synchronize { @pids.dup }
end

#startvoid

This method returns an undefined value.

Start the process manager. This method will start the specified number of sidekiq processes and monitor them. It will only exit once all child processes have exited. If a child process dies unexpectedly, it will be restarted.

Child processes are manged by sending the signals you would normally send to a sidekiq process to the process manager instead.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/sidekiq/process_manager/manager.rb', line 57

def start
  raise "Process manager already started" if started?
  @started = true

  load_sidekiq

  master_pid = ::Process.pid

  @signal_pipe_read, @signal_pipe_write = IO.pipe

  @signal_thread = Thread.new do
    Thread.current.name = "signal-handler"

    while @signal_pipe_read.wait_readable
      begin
        signal = @signal_pipe_read.gets.strip
        send_signal_to_children(signal.to_sym)
      rescue => e
        log_error("Error handling signal #{signal}: #{e.message}")
      end
    end
  end

  # Trap signals that will be forwarded to child processes
  [:INT, :TERM, :USR1, :USR2, :TSTP, :TTIN].each do |signal|
    ::Signal.trap(signal) do
      if ::Process.pid == master_pid
        @signal_pipe_write.puts(signal)
      end
    end
  end

  # Ensure that child processes receive the term signal when the master process exits.
  at_exit do
    if ::Process.pid == master_pid && @process_count > 0
      send_signal_to_children(:TERM)
    end
  end

  GC.start
  GC.compact if GC.respond_to?(:compact)
  # I'm not sure why, but running GC operations blocks until we try to write some I/O.
  File.write("/dev/null", "0")

  @process_count.times do
    start_child_process!
  end

  start_memory_monitor if @max_memory

  log_info("Process manager started with pid #{::Process.pid}")
  monitor_child_processes
  log_info("Process manager #{::Process.pid} exiting")
end

#started?Boolean

Return true if the process manager has started.

Returns:

  • (Boolean)


146
147
148
# File 'lib/sidekiq/process_manager/manager.rb', line 146

def started?
  @started
end

#stopvoid

This method returns an undefined value.

Helper to gracefully stop all child processes.



129
130
131
132
133
134
# File 'lib/sidekiq/process_manager/manager.rb', line 129

def stop
  stop_memory_monitor
  @process_count = 0
  send_signal_to_children(:TSTP)
  send_signal_to_children(:TERM)
end

#wait(timeout = 5) ⇒ void

This method returns an undefined value.

Helper to wait on the manager to wait on child processes to start up.

Parameters:

  • timeout (Integer) (defaults to: 5)

    The number of seconds to wait for child processes to start.

Raises:

  • (Timeout::Error)


116
117
118
119
120
121
122
123
124
# File 'lib/sidekiq/process_manager/manager.rb', line 116

def wait(timeout = 5)
  timeout_time = monotonic_time + timeout
  while monotonic_time <= timeout_time
    return if @pids.size == @process_count
    sleep(0.01)
  end

  raise Timeout::Error.new("child processes failed to start in #{timeout} seconds")
end