Class: Resqued::Master

Inherits:
Object
  • Object
show all
Includes:
Logging, Pidfile, ProclineVersion, Sleepy
Defined in:
lib/resqued/master.rb

Overview

The master process.

  • Spawns a listener.

  • Tracks all work. (IO pipe from listener.)

  • Handles signals.

Constant Summary collapse

SIGNALS =
[ :HUP, :INT, :USR2, :CONT, :TERM, :QUIT ]
OPTIONAL_SIGNALS =
[ :INFO ]
OTHER_SIGNALS =
[:CHLD, 'EXIT']
TRAPS =
SIGNALS + OPTIONAL_SIGNALS + OTHER_SIGNALS
SIGNAL_QUEUE =
[]

Instance Method Summary collapse

Methods included from Sleepy

#awake, #self_pipe

Methods included from ProclineVersion

#procline_version

Methods included from Pidfile

#remove_pidfile, #with_pidfile, #write_pidfile

Methods included from Logging

build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs

Constructor Details

#initialize(options) ⇒ Master

Returns a new instance of Master.



19
20
21
22
23
24
25
# File 'lib/resqued/master.rb', line 19

def initialize(options)
  @config_paths = options.fetch(:config_paths)
  @pidfile      = options.fetch(:master_pidfile) { nil }
  @status_pipe  = options.fetch(:status_pipe) { nil }
  @listener_backoff = Backoff.new
  @listeners_created = 0
end

Instance Method Details

#all_listenersObject

Private: All the ListenerProxy objects.



108
109
110
# File 'lib/resqued/master.rb', line 108

def all_listeners
  listener_pids.values
end

#dump_object_countsObject

Private.



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/resqued/master.rb', line 76

def dump_object_counts
  log GC.stat.inspect
  counts = {}
  total = 0
  ObjectSpace.each_object do |o|
    count = counts[o.class.name] || 0
    counts[o.class.name] = count + 1
    total += 1
  end
  top = 10
  log "#{total} objects. top #{top}:"
  counts.sort_by { |name, count| count }.reverse.each_with_index do |(name, count), i|
    if i < top
      diff = ""
      if last = @last_counts && @last_counts[name]
        diff = " (#{'%+d' % (count - last)})"
      end
      log "   #{count} #{name}#{diff}"
    end
  end
  @last_counts = counts
  log GC.stat.inspect
rescue => e
  log "Error while counting objects: #{e}"
end

#go_hamObject

Private: dat main loop.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/resqued/master.rb', line 43

def go_ham
  loop do
    read_listeners
    reap_all_listeners(Process::WNOHANG)
    start_listener unless @paused
    case signal = SIGNAL_QUEUE.shift
    when nil
      yawn(@listener_backoff.how_long? || 30.0)
    when :INFO
      dump_object_counts
    when :HUP
      reopen_logs
      log "Restarting listener with new configuration and application."
      prepare_new_listener
    when :USR2
      log "Pause job processing"
      @paused = true
      kill_listener(:QUIT, @current_listener)
      @current_listener = nil
    when :CONT
      log "Resume job processing"
      @paused = false
      kill_all_listeners(:CONT)
    when :INT, :TERM, :QUIT
      log "Shutting down..."
      kill_all_listeners(signal)
      wait_for_workers
      break
    end
  end
end

#install_signal_handlersObject



220
221
222
223
224
# File 'lib/resqued/master.rb', line 220

def install_signal_handlers
  trap(:CHLD) { awake }
  SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal ; awake } }
  OPTIONAL_SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal ; awake } rescue nil }
end

#kill_all_listeners(signal) ⇒ Object



182
183
184
185
186
# File 'lib/resqued/master.rb', line 182

def kill_all_listeners(signal)
  all_listeners.each do |l|
    l.kill(signal)
  end
end

#kill_listener(signal, listener) ⇒ Object



178
179
180
# File 'lib/resqued/master.rb', line 178

def kill_listener(signal, listener)
  listener.kill(signal) if listener
end

#listener_pidsObject

Private: Map listener pids to ListenerProxy objects.



103
104
105
# File 'lib/resqued/master.rb', line 103

def listener_pids
  @listener_pids ||= {}
end

#listener_running(listener) ⇒ Object

Listener message: A listener finished booting, and is ready to start workers.

Promotes a booting listener to be the current listener.



150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/resqued/master.rb', line 150

def listener_running(listener)
  listener_status(listener, 'ready')
  if listener == @current_listener
    kill_listener(:QUIT, @last_good_listener)
    @last_good_listener = nil
  else
    # This listener didn't receive the last SIGQUIT we sent.
    # (It was probably sent before the listener had set up its traps.)
    # So kill it again. We have moved on.
    kill_listener(:QUIT, listener)
  end
end

#listener_status(listener, status) ⇒ Object



249
250
251
252
253
# File 'lib/resqued/master.rb', line 249

def listener_status(listener, status)
  if listener && listener.pid
    status_message('listener', listener.pid, status)
  end
end

#next_listener_idObject



122
123
124
# File 'lib/resqued/master.rb', line 122

def next_listener_id
  @listeners_created += 1
end

#no_more_unexpected_exitsObject



237
238
239
# File 'lib/resqued/master.rb', line 237

def no_more_unexpected_exits
  trap('EXIT', 'DEFAULT')
end

#prepare_new_listenerObject

Private: Spin up a new listener.

The old one will be killed when the new one is ready for workers.



166
167
168
169
170
171
172
173
174
175
176
# File 'lib/resqued/master.rb', line 166

def prepare_new_listener
  if @last_good_listener
    # The last_good_listener is still running because we got another HUP before the new listener finished booting.
    # Keep the last_good_listener (where all the workers are) and kill the booting current_listener. We'll start a new one.
    kill_listener(:QUIT, @current_listener)
  else
    @last_good_listener = @current_listener
  end
  # Indicate to `start_listener` that it should start a new listener.
  @current_listener = nil
end

#read_listenersObject



126
127
128
129
130
# File 'lib/resqued/master.rb', line 126

def read_listeners
  all_listeners.each do |l|
    l.read_worker_status(:on_activity => self)
  end
end

#reap_all_listeners(waitpid_flags = 0) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/resqued/master.rb', line 192

def reap_all_listeners(waitpid_flags = 0)
  begin
    lpid, status = Process.waitpid2(-1, waitpid_flags)
    if lpid
      log "Listener exited #{status}"
      if @current_listener && @current_listener.pid == lpid
        @listener_backoff.died
        @current_listener = nil
      end
      dead_listener = listener_pids.delete(lpid)
      listener_status dead_listener, 'stop'
      dead_listener.dispose
      write_procline
    else
      return
    end
  rescue Errno::ECHILD
    return
  end while true
end

#report_unexpected_exitsObject



226
227
228
229
230
231
232
233
234
235
# File 'lib/resqued/master.rb', line 226

def report_unexpected_exits
  trap('EXIT') do
    log("EXIT #{$!.inspect}")
    if $!
      $!.backtrace.each do |line|
        log(line)
      end
    end
  end
end

#run(ready_pipe = nil) ⇒ Object

Public: Starts the master process.



28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/resqued/master.rb', line 28

def run(ready_pipe = nil)
  report_unexpected_exits
  with_pidfile(@pidfile) do
    write_procline
    install_signal_handlers
    if ready_pipe
      ready_pipe.syswrite($$.to_s)
      ready_pipe.close rescue nil
    end
    go_ham
  end
  no_more_unexpected_exits
end

#start_listenerObject



112
113
114
115
116
117
118
119
120
# File 'lib/resqued/master.rb', line 112

def start_listener
  return if @current_listener || @listener_backoff.wait?
  @current_listener = ListenerProxy.new(:config_paths => @config_paths, :old_workers => all_listeners.map { |l| l.running_workers }.flatten, :listener_id => next_listener_id)
  @current_listener.run
  listener_status @current_listener, 'start'
  @listener_backoff.started
  listener_pids[@current_listener.pid] = @current_listener
  write_procline
end

#status_message(type, pid, status) ⇒ Object



259
260
261
262
263
# File 'lib/resqued/master.rb', line 259

def status_message(type, pid, status)
  if @status_pipe
    @status_pipe.write("#{type},#{pid},#{status}\n")
  end
end

#wait_for_workersObject



188
189
190
# File 'lib/resqued/master.rb', line 188

def wait_for_workers
  reap_all_listeners
end

#worker_finished(pid) ⇒ Object

Listener message: A worker just stopped working.

Forwards the message to the other listeners.



140
141
142
143
144
145
# File 'lib/resqued/master.rb', line 140

def worker_finished(pid)
  worker_status(pid, 'stop')
  all_listeners.each do |other|
    other.worker_finished(pid)
  end
end

#worker_started(pid) ⇒ Object

Listener message: A worker just started working.



133
134
135
# File 'lib/resqued/master.rb', line 133

def worker_started(pid)
  worker_status(pid, 'start')
end

#worker_status(pid, status) ⇒ Object



255
256
257
# File 'lib/resqued/master.rb', line 255

def worker_status(pid, status)
  status_message('worker', pid, status)
end

#write_proclineObject



245
246
247
# File 'lib/resqued/master.rb', line 245

def write_procline
  $0 = "#{procline_version} master [gen #{@listeners_created}] [#{listener_pids.size} running] #{ARGV.join(' ')}"
end

#yawn(duration) ⇒ Object



241
242
243
# File 'lib/resqued/master.rb', line 241

def yawn(duration)
  super(duration, all_listeners.map { |l| l.read_pipe })
end