Class: Resqued::Master

Inherits:
Object
  • Object
show all
Includes:
Logging, Pidfile, ProclineVersion, Sleepy
Defined in:
lib/resqued/master.rb

Overview

The master process.

  • Spawns a listener.

  • Tracks all work. (IO pipe from listener.)

  • Handles signals.

Constant Summary collapse

SIGNALS =
[:HUP, :INT, :USR2, :CONT, :TERM, :QUIT].freeze
OPTIONAL_SIGNALS =
[:INFO].freeze
OTHER_SIGNALS =
[:CHLD, "EXIT"].freeze
TRAPS =
SIGNALS + OPTIONAL_SIGNALS + OTHER_SIGNALS
SIGNAL_QUEUE =

rubocop: disable Style/MutableConstant

[]

Instance Method Summary collapse

Methods included from Sleepy

#awake, #self_pipe

Methods included from ProclineVersion

#procline_version

Methods included from Pidfile

#remove_pidfile, #with_pidfile, #write_pidfile

Methods included from Logging

build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs

Constructor Details

#initialize(state, options = {}) ⇒ Master

Returns a new instance of Master.



21
22
23
24
25
26
# File 'lib/resqued/master.rb', line 21

def initialize(state, options = {})
  @state = state
  @status_pipe = options.fetch(:status_pipe, nil)
  @listeners = ListenerPool.new(state)
  @listener_backoff = Backoff.new
end

Instance Method Details

#dump_object_countsObject

Private.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/resqued/master.rb', line 84

def dump_object_counts
  log GC.stat.inspect
  counts = {}
  total = 0
  ObjectSpace.each_object do |o|
    count = counts[o.class.name] || 0
    counts[o.class.name] = count + 1
    total += 1
  end
  top = 10
  log "#{total} objects. top #{top}:"
  counts.sort_by { |_, count| -count }.each_with_index do |(name, count), i|
    next unless i < top

    diff = ""
    if last = @last_counts && @last_counts[name]
      diff = sprintf(" (%+d)", (count - last))
    end
    log "   #{count} #{name}#{diff}"
  end
  @last_counts = counts
  log GC.stat.inspect
rescue => e
  log "Error while counting objects: #{e}"
end

#go_hamObject

Private: dat main loop.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/resqued/master.rb', line 44

def go_ham
  # If we're resuming, we'll want to recycle the existing listener now.
  prepare_new_listener

  loop do
    read_listeners
    reap_all_listeners(Process::WNOHANG)
    start_listener unless @state.paused
    case signal = SIGNAL_QUEUE.shift
    when nil
      yawn(@listener_backoff.how_long? || 30.0)
    when :INFO
      dump_object_counts
    when :HUP
      if @state.exec_on_hup
        log "Execing a new master"
        ExecOnHUP.exec!(@state)
      end
      reopen_logs
      log "Restarting listener with new configuration and application."
      prepare_new_listener
    when :USR2
      log "Pause job processing"
      @state.paused = true
      kill_listener(:QUIT, @listeners.current)
      @listeners.clear_current!
    when :CONT
      log "Resume job processing"
      @state.paused = false
      kill_all_listeners(:CONT)
    when :INT, :TERM, :QUIT
      log "Shutting down..."
      kill_all_listeners(signal)
      wait_for_workers unless @state.fast_exit
      break
    end
  end
end

#install_signal_handlersObject



225
226
227
228
229
# File 'lib/resqued/master.rb', line 225

def install_signal_handlers
  trap(:CHLD) { awake }
  SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal; awake } }
  OPTIONAL_SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal; awake } rescue nil }
end

#kill_all_listeners(signal) ⇒ Object



179
180
181
182
183
# File 'lib/resqued/master.rb', line 179

def kill_all_listeners(signal)
  @listeners.each do |l|
    l.kill(signal)
  end
end

#kill_listener(signal, listener) ⇒ Object



175
176
177
# File 'lib/resqued/master.rb', line 175

def kill_listener(signal, listener)
  listener&.kill(signal)
end

#listener_running(listener) ⇒ Object

Listener message: A listener finished booting, and is ready to start workers.

Promotes a booting listener to be the current listener.



144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/resqued/master.rb', line 144

def listener_running(listener)
  listener_status(listener, "ready")
  if listener == @listeners.current
    kill_listener(:QUIT, @listeners.last_good)
    @listeners.clear_last_good!
  else
    # This listener didn't receive the last SIGQUIT we sent.
    # (It was probably sent before the listener had set up its traps.)
    # So kill it again. We have moved on.
    kill_listener(:QUIT, listener)
  end
end

#listener_status(listener, status) ⇒ Object



252
253
254
255
256
# File 'lib/resqued/master.rb', line 252

def listener_status(listener, status)
  if listener&.pid
    status_message("listener", listener.pid, status)
  end
end

#no_more_unexpected_exitsObject



240
241
242
# File 'lib/resqued/master.rb', line 240

def no_more_unexpected_exits
  trap("EXIT", "DEFAULT")
end

#prepare_new_listenerObject

Private: Spin up a new listener.

The old one will be killed when the new one is ready for workers.



160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/resqued/master.rb', line 160

def prepare_new_listener
  if @listeners.last_good
    # The last good listener is still running because we got another
    # HUP before the new listener finished booting.
    # Keep the last_good_listener (where all the workers are) and
    # kill the booting current_listener. We'll start a new one.
    kill_listener(:QUIT, @listeners.current)
    # Indicate to `start_listener` that it should start a new
    # listener.
    @listeners.clear_current!
  else
    @listeners.cycle_current
  end
end

#read_listenersObject



119
120
121
122
123
# File 'lib/resqued/master.rb', line 119

def read_listeners
  @listeners.each do |l|
    l.read_worker_status(on_activity: self)
  end
end

#reap_all_listeners(waitpid_flags = 0) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/resqued/master.rb', line 189

def reap_all_listeners(waitpid_flags = 0)
  until @listeners.empty?
    begin
      lpid, status = Process.waitpid2(-1, waitpid_flags)
      return unless lpid

      log "Listener exited #{status}"

      if @listeners.current_pid == lpid
        @listener_backoff.died
        @listeners.clear_current!
      end

      if @listeners.last_good_pid == lpid
        @listeners.clear_last_good!
      end

      if dead_listener = @listeners.delete(lpid)
        listener_status dead_listener, "stop"
        dead_listener.dispose
      end

      write_procline
    rescue Errno::ECHILD
      return
    end
  end
end

#report_unexpected_exitsObject



231
232
233
234
235
236
237
238
# File 'lib/resqued/master.rb', line 231

def report_unexpected_exits
  trap("EXIT") do
    log("EXIT #{$!.inspect}")
    $!&.backtrace&.each do |line|
      log(line)
    end
  end
end

#run(ready_pipe = nil) ⇒ Object

Public: Starts the master process.



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/resqued/master.rb', line 29

def run(ready_pipe = nil)
  report_unexpected_exits
  with_pidfile(@state.pidfile) do
    write_procline
    install_signal_handlers
    if ready_pipe
      ready_pipe.syswrite($$.to_s)
      ready_pipe.close rescue nil
    end
    go_ham
  end
  no_more_unexpected_exits
end

#start_listenerObject



110
111
112
113
114
115
116
117
# File 'lib/resqued/master.rb', line 110

def start_listener
  return if @listeners.current || @listener_backoff.wait?

  listener = @listeners.start!
  listener_status listener, "start"
  @listener_backoff.started
  write_procline
end

#status_message(type, pid, status) ⇒ Object



262
263
264
# File 'lib/resqued/master.rb', line 262

def status_message(type, pid, status)
  @status_pipe&.write("#{type},#{pid},#{status}\n")
end

#wait_for_workersObject



185
186
187
# File 'lib/resqued/master.rb', line 185

def wait_for_workers
  reap_all_listeners
end

#worker_finished(pid) ⇒ Object

Listener message: A worker just stopped working.

Forwards the message to the other listeners.



133
134
135
136
137
138
# File 'lib/resqued/master.rb', line 133

def worker_finished(pid)
  worker_status(pid, "stop")
  @listeners.each do |other|
    other.worker_finished(pid)
  end
end

#worker_started(pid) ⇒ Object

Listener message: A worker just started working.



126
127
128
# File 'lib/resqued/master.rb', line 126

def worker_started(pid)
  worker_status(pid, "start")
end

#worker_status(pid, status) ⇒ Object



258
259
260
# File 'lib/resqued/master.rb', line 258

def worker_status(pid, status)
  status_message("worker", pid, status)
end

#write_proclineObject



248
249
250
# File 'lib/resqued/master.rb', line 248

def write_procline
  $0 = "#{procline_version} master [gen #{@state.listeners_created}] [#{@listeners.size} running] #{ARGV.join(' ')}"
end

#yawn(duration) ⇒ Object



244
245
246
# File 'lib/resqued/master.rb', line 244

def yawn(duration)
  super(duration, @listeners.map { |l| l.read_pipe })
end