Class: Resqued::Master

Inherits:
Object
  • Object
show all
Includes:
Logging, Pidfile, ProclineVersion, Sleepy
Defined in:
lib/resqued/master.rb

Overview

The master process.

  • Spawns a listener.

  • Tracks all work. (IO pipe from listener.)

  • Handles signals.

Constant Summary collapse

SIGNALS =
[:HUP, :INT, :USR1, :USR2, :CONT, :TERM, :QUIT].freeze
OPTIONAL_SIGNALS =
[:INFO].freeze
OTHER_SIGNALS =
[:CHLD, "EXIT"].freeze
TRAPS =
SIGNALS + OPTIONAL_SIGNALS + OTHER_SIGNALS
SIGNAL_QUEUE =

rubocop: disable Style/MutableConstant

[]

Instance Method Summary collapse

Methods included from Sleepy

#awake, #self_pipe

Methods included from ProclineVersion

#procline_version

Methods included from Pidfile

#remove_pidfile, #with_pidfile, #write_pidfile

Methods included from Logging

build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs

Constructor Details

#initialize(state, options = {}) ⇒ Master

Returns a new instance of Master.



21
22
23
24
25
26
# File 'lib/resqued/master.rb', line 21

def initialize(state, options = {})
  @state = state
  @status_pipe = options.fetch(:status_pipe, nil)
  @listeners = ListenerPool.new(state)
  @listener_backoff = Backoff.new
end

Instance Method Details

#dump_object_countsObject

Private.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/resqued/master.rb', line 83

def dump_object_counts
  log GC.stat.inspect
  counts = {}
  total = 0
  ObjectSpace.each_object do |o|
    count = counts[o.class.name] || 0
    counts[o.class.name] = count + 1
    total += 1
  end
  top = 10
  log "#{total} objects. top #{top}:"
  counts.sort_by { |_, count| -count }.each_with_index do |(name, count), i|
    next unless i < top

    diff = ""
    if last = @last_counts && @last_counts[name]
      diff = sprintf(" (%+d)", (count - last))
    end
    log "   #{count} #{name}#{diff}"
  end
  @last_counts = counts
  log GC.stat.inspect
rescue => e
  log "Error while counting objects: #{e}"
end

#go_hamObject

Private: dat main loop.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/resqued/master.rb', line 44

def go_ham
  # If we're resuming, we'll want to recycle the existing listener now.
  prepare_new_listener

  loop do
    read_listeners
    reap_all_listeners(Process::WNOHANG)
    start_listener unless @state.paused
    case signal = SIGNAL_QUEUE.shift
    when nil
      yawn(@listener_backoff.how_long? || 30.0)
    when :INFO
      dump_object_counts
    when :HUP
      reopen_logs
      log "Restarting listener with new configuration and application."
      prepare_new_listener
    when :USR1
      log "Execing a new master"
      ReplaceMaster.exec!(@state)
    when :USR2
      log "Pause job processing"
      @state.paused = true
      kill_listener(:QUIT, @listeners.current)
      @listeners.clear_current!
    when :CONT
      log "Resume job processing"
      @state.paused = false
      kill_all_listeners(:CONT)
    when :INT, :TERM, :QUIT
      log "Shutting down..."
      kill_all_listeners(signal)
      wait_for_workers unless @state.fast_exit
      break
    end
  end
end

#install_signal_handlersObject



224
225
226
227
228
# File 'lib/resqued/master.rb', line 224

def install_signal_handlers
  trap(:CHLD) { awake }
  SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal; awake } }
  OPTIONAL_SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal; awake } rescue nil }
end

#kill_all_listeners(signal) ⇒ Object



178
179
180
181
182
# File 'lib/resqued/master.rb', line 178

def kill_all_listeners(signal)
  @listeners.each do |l|
    l.kill(signal)
  end
end

#kill_listener(signal, listener) ⇒ Object



174
175
176
# File 'lib/resqued/master.rb', line 174

def kill_listener(signal, listener)
  listener&.kill(signal)
end

#listener_running(listener) ⇒ Object

Listener message: A listener finished booting, and is ready to start workers.

Promotes a booting listener to be the current listener.



143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/resqued/master.rb', line 143

def listener_running(listener)
  listener_status(listener, "ready")
  if listener == @listeners.current
    kill_listener(:QUIT, @listeners.last_good)
    @listeners.clear_last_good!
  else
    # This listener didn't receive the last SIGQUIT we sent.
    # (It was probably sent before the listener had set up its traps.)
    # So kill it again. We have moved on.
    kill_listener(:QUIT, listener)
  end
end

#listener_status(listener, status) ⇒ Object



251
252
253
254
255
# File 'lib/resqued/master.rb', line 251

def listener_status(listener, status)
  if listener&.pid
    status_message("listener", listener.pid, status)
  end
end

#no_more_unexpected_exitsObject



239
240
241
# File 'lib/resqued/master.rb', line 239

def no_more_unexpected_exits
  trap("EXIT", "DEFAULT")
end

#prepare_new_listenerObject

Private: Spin up a new listener.

The old one will be killed when the new one is ready for workers.



159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/resqued/master.rb', line 159

def prepare_new_listener
  if @listeners.last_good
    # The last good listener is still running because we got another
    # HUP before the new listener finished booting.
    # Keep the last_good_listener (where all the workers are) and
    # kill the booting current_listener. We'll start a new one.
    kill_listener(:QUIT, @listeners.current)
    # Indicate to `start_listener` that it should start a new
    # listener.
    @listeners.clear_current!
  else
    @listeners.cycle_current
  end
end

#read_listenersObject



118
119
120
121
122
# File 'lib/resqued/master.rb', line 118

def read_listeners
  @listeners.each do |l|
    l.read_worker_status(on_activity: self)
  end
end

#reap_all_listeners(waitpid_flags = 0) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/resqued/master.rb', line 188

def reap_all_listeners(waitpid_flags = 0)
  until @listeners.empty?
    begin
      lpid, status = Process.waitpid2(-1, waitpid_flags)
      return unless lpid

      log "Listener exited #{status}"

      if @listeners.current_pid == lpid
        @listener_backoff.died
        @listeners.clear_current!
      end

      if @listeners.last_good_pid == lpid
        @listeners.clear_last_good!
      end

      if dead_listener = @listeners.delete(lpid)
        listener_status dead_listener, "stop"
        dead_listener.dispose
      end

      write_procline
    rescue Errno::ECHILD
      return
    end
  end
end

#report_unexpected_exitsObject



230
231
232
233
234
235
236
237
# File 'lib/resqued/master.rb', line 230

def report_unexpected_exits
  trap("EXIT") do
    log("EXIT #{$!.inspect}")
    $!&.backtrace&.each do |line|
      log(line)
    end
  end
end

#run(ready_pipe = nil) ⇒ Object

Public: Starts the master process.



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/resqued/master.rb', line 29

def run(ready_pipe = nil)
  report_unexpected_exits
  with_pidfile(@state.pidfile) do
    write_procline
    install_signal_handlers
    if ready_pipe
      ready_pipe.syswrite($$.to_s)
      ready_pipe.close rescue nil
    end
    go_ham
  end
  no_more_unexpected_exits
end

#start_listenerObject



109
110
111
112
113
114
115
116
# File 'lib/resqued/master.rb', line 109

def start_listener
  return if @listeners.current || @listener_backoff.wait?

  listener = @listeners.start!
  listener_status listener, "start"
  @listener_backoff.started
  write_procline
end

#status_message(type, pid, status) ⇒ Object



261
262
263
# File 'lib/resqued/master.rb', line 261

def status_message(type, pid, status)
  @status_pipe&.write("#{type},#{pid},#{status}\n")
end

#wait_for_workersObject



184
185
186
# File 'lib/resqued/master.rb', line 184

def wait_for_workers
  reap_all_listeners
end

#worker_finished(pid) ⇒ Object

Listener message: A worker just stopped working.

Forwards the message to the other listeners.



132
133
134
135
136
137
# File 'lib/resqued/master.rb', line 132

def worker_finished(pid)
  worker_status(pid, "stop")
  @listeners.each do |other|
    other.worker_finished(pid)
  end
end

#worker_started(pid) ⇒ Object

Listener message: A worker just started working.



125
126
127
# File 'lib/resqued/master.rb', line 125

def worker_started(pid)
  worker_status(pid, "start")
end

#worker_status(pid, status) ⇒ Object



257
258
259
# File 'lib/resqued/master.rb', line 257

def worker_status(pid, status)
  status_message("worker", pid, status)
end

#write_proclineObject



247
248
249
# File 'lib/resqued/master.rb', line 247

def write_procline
  $0 = "#{procline_version} master [gen #{@state.listeners_created}] [#{@listeners.size} running] #{ARGV.join(' ')}"
end

#yawn(duration) ⇒ Object



243
244
245
# File 'lib/resqued/master.rb', line 243

def yawn(duration)
  super(duration, @listeners.map { |l| l.read_pipe })
end