Module: Einhorn::Command

Defined in:
lib/einhorn/command.rb,
lib/einhorn/command/interface.rb

Defined Under Namespace

Modules: Interface

Class Method Summary collapse

Class Method Details

.cullObject



315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/einhorn/command.rb', line 315

def self.cull
  acked = Einhorn::WorkerPool.ack_count
  target = Einhorn::WorkerPool.ack_target

  if Einhorn::State.upgrading && acked >= target
    Einhorn::State.upgrading = false
    Einhorn.log_info("Upgrade to version #{Einhorn::State.version} complete.")
  end

  old_workers = Einhorn::WorkerPool.old_workers
  if !Einhorn::State.upgrading && old_workers.length > 0
    Einhorn.log_info("Killing off #{old_workers.length} old workers.")
    signal_all("USR2", old_workers)
  end

  if acked > target
    excess = Einhorn::WorkerPool.acked_unsignaled_modern_workers[0...(acked-target)]
    Einhorn.log_info("Have too many workers at the current version, so killing off #{excess.length} of them.")
    signal_all("USR2", excess)
  end
end

.decrementObject



140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/einhorn/command.rb', line 140

def self.decrement
  if Einhorn::State.config[:number] <= 1
    output = "Can't decrease number of workers (already at #{Einhorn::State.config[:number]}).  Run kill #{$$} if you really want to kill einhorn."
    $stderr.puts(output)
    return output
  end

  Einhorn::Event.break_loop
  old = Einhorn::State.config[:number]
  new = (Einhorn::State.config[:number] -= 1)
  output = "Decrementing number of workers from #{old} -> #{new}"
  $stderr.puts(output)
  output
end

.dumpable_stateObject



155
156
157
158
159
160
161
162
163
164
165
# File 'lib/einhorn/command.rb', line 155

def self.dumpable_state
  global_state = Einhorn::State.state
  descriptor_state = Einhorn::Event.persistent_descriptors.map do |descriptor|
    descriptor.to_state
  end

  {
    :state => global_state,
    :persistent_descriptors => descriptor_state
  }
end

.full_upgradeObject



284
285
286
287
288
289
290
# File 'lib/einhorn/command.rb', line 284

def self.full_upgrade
  if Einhorn::State.path && !Einhorn::State.reloading_for_preload_upgrade
    reload_for_preload_upgrade
  else
    upgrade_workers
  end
end

.incrementObject



131
132
133
134
135
136
137
138
# File 'lib/einhorn/command.rb', line 131

def self.increment
  Einhorn::Event.break_loop
  old = Einhorn::State.config[:number]
  new = (Einhorn::State.config[:number] += 1)
  output = "Incrementing number of workers from #{old} -> #{new}"
  $stderr.puts(output)
  output
end

.louder(log = true) ⇒ Object



394
395
396
397
398
399
# File 'lib/einhorn/command.rb', line 394

def self.louder(log=true)
  Einhorn::State.verbosity -= 1 if Einhorn::State.verbosity > 0
  output = "Verbosity set to #{Einhorn::State.verbosity}"
  Einhorn.log_info(output) if log
  output
end

.mourn(pid) ⇒ Object

Mourn the death of your child



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/einhorn/command.rb', line 24

def self.mourn(pid)
  unless spec = Einhorn::State.children[pid]
    Einhorn.log_error("Could not find any config for exited child #{pid.inspect}! This probably indicates a bug in Einhorn.")
    return
  end

  Einhorn::State.children.delete(pid)

  # Unacked worker
  if spec[:type] == :worker && !spec[:acked]
    Einhorn::State.consecutive_deaths_before_ack += 1
    extra = ' before it was ACKed'
  else
    extra = nil
  end

  case type = spec[:type]
  when :worker
    Einhorn.log_info("===> Exited worker #{pid.inspect}#{extra}")
  when :state_passer
    Einhorn.log_debug("===> Exited state passing process #{pid.inspect}")
  else
    Einhorn.log_error("===> Exited process #{pid.inspect} has unrecgonized type #{type.inspect}: #{spec.inspect}")
  end
end

.prepare_child_environmentObject



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/einhorn/command.rb', line 260

def self.prepare_child_environment
  # This is run from the child
  ENV['EINHORN_MASTER_PID'] = Process.ppid.to_s
  ENV['EINHORN_SOCK_PATH'] = Einhorn::Command::Interface.socket_path
  if Einhorn::State.command_socket_as_fd
    socket = UNIXSocket.open(Einhorn::Command::Interface.socket_path)
    Einhorn::TransientState.socket_handles << socket
    ENV['EINHORN_SOCK_FD'] = socket.fileno.to_s
  end

  ENV['EINHORN_FD_COUNT'] = Einhorn::State.bind_fds.length.to_s
  Einhorn::State.bind_fds.each_with_index {|fd, i| ENV["EINHORN_FD_#{i}"] = fd.to_s}

  # EINHORN_FDS is deprecated. It was originally an attempt to
  # match Upstart's nominal internal support for space-separated
  # FD lists, but nobody uses that in practice, and it makes
  # finding individual FDs more difficult
  ENV['EINHORN_FDS'] = Einhorn::State.bind_fds.map(&:to_s).join(' ')
end

.prepare_child_processObject



280
281
282
# File 'lib/einhorn/command.rb', line 280

def self.prepare_child_process
  Einhorn.renice_self
end

.quieter(log = true) ⇒ Object



387
388
389
390
391
392
# File 'lib/einhorn/command.rb', line 387

def self.quieter(log=true)
  Einhorn::State.verbosity += 1 if Einhorn::State.verbosity < 2
  output = "Verbosity set to #{Einhorn::State.verbosity}"
  Einhorn.log_info(output) if log
  output
end

.reapObject



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/einhorn/command.rb', line 9

def self.reap
  begin
    while true
      Einhorn.log_debug('Going to reap a child process')

      pid = Process.wait(-1, Process::WNOHANG)
      return unless pid
      mourn(pid)
      Einhorn::Event.break_loop
    end
  rescue Errno::ECHILD
  end
end

.register_ack(pid) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/einhorn/command.rb', line 77

def self.register_ack(pid)
  unless spec = Einhorn::State.children[pid]
    Einhorn.log_error("Could not find state for PID #{pid.inspect}; ignoring ACK.")
    return
  end

  if spec[:acked]
    Einhorn.log_error("Pid #{pid.inspect} already ACKed; ignoring new ACK.")
    return
  end

  if Einhorn::State.consecutive_deaths_before_ack > 0
    extra = ", breaking the streak of #{Einhorn::State.consecutive_deaths_before_ack} consecutive unacked workers dying"
  else
    extra = nil
  end
  Einhorn::State.consecutive_deaths_before_ack = 0

  spec[:acked] = true
  Einhorn.log_info("Up to #{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} #{Einhorn::State.ack_mode[:type]} ACKs#{extra}")
  # Could call cull here directly instead, I believe.
  Einhorn::Event.break_loop
end

.register_manual_ack(pid) ⇒ Object



50
51
52
53
54
55
56
57
58
# File 'lib/einhorn/command.rb', line 50

def self.register_manual_ack(pid)
  ack_mode = Einhorn::State.ack_mode
  unless ack_mode[:type] == :manual
    Einhorn.log_error("Received a manual ACK for #{pid.inspect}, but ack_mode is #{ack_mode.inspect}. Ignoring ACK.")
    return
  end
  Einhorn.log_info("Received a manual ACK from #{pid.inspect}")
  register_ack(pid)
end

.register_timer_ack(time, pid) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/einhorn/command.rb', line 60

def self.register_timer_ack(time, pid)
  ack_mode = Einhorn::State.ack_mode
  unless ack_mode[:type] == :timer
    Einhorn.log_error("Received a timer ACK for #{pid.inspect}, but ack_mode is #{ack_mode.inspect}. Ignoring ACK.")
    return
  end

  unless Einhorn::State.children[pid]
    # TODO: Maybe cancel pending ACK timers upon death?
    Einhorn.log_debug("Worker #{pid.inspect} died before its timer ACK happened.")
    return
  end

  Einhorn.log_info("Worker #{pid.inspect} has been up for #{time}s, so we are considering it alive.")
  register_ack(pid)
end

.reloadObject



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/einhorn/command.rb', line 167

def self.reload
  unless Einhorn::State.respawn
    Einhorn.log_info("Not reloading einhorn because we're exiting")
    return
  end

  Einhorn.log_info("Reloading einhorn (#{Einhorn::TransientState.script_name})...")

  # In case there's anything lurking
  $stdout.flush

  # Spawn a child to pass the state through the pipe
  read, write = IO.pipe
  fork do
    Einhorn::TransientState.whatami = :state_passer
    Einhorn::State.generation += 1
    Einhorn::State.children[$$] = {
      :type => :state_passer
    }
    read.close

    write.write(YAML.dump(dumpable_state))
    write.close

    exit(0)
  end
  write.close

  # Reload the original environment
  ENV.clear
  ENV.update(Einhorn::TransientState.environ)

  begin
    exec [Einhorn::TransientState.script_name, Einhorn::TransientState.script_name], *(['--with-state-fd', read.fileno.to_s, '--'] + Einhorn::State.cmd)
  rescue SystemCallError => e
    Einhorn.log_error("Could not reload! Attempting to continue. Error was: #{e}")
    Einhorn::State.reloading_for_preload_upgrade = false
    read.close
  end
end

.reload_for_preload_upgradeObject



292
293
294
295
# File 'lib/einhorn/command.rb', line 292

def self.reload_for_preload_upgrade
  Einhorn::State.reloading_for_preload_upgrade = true
  reload
end

.replenishObject



337
338
339
340
341
342
343
344
345
# File 'lib/einhorn/command.rb', line 337

def self.replenish
  return unless Einhorn::State.respawn

  if !Einhorn::State.last_spinup
    replenish_immediately
  else
    replenish_gradually
  end
end

.replenish_graduallyObject



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# File 'lib/einhorn/command.rb', line 357

def self.replenish_gradually
  return if Einhorn::TransientState.has_outstanding_spinup_timer
  return unless Einhorn::WorkerPool.missing_worker_count > 0

  # Exponentially backoff automated spinup if we're just having
  # things die before ACKing
  spinup_interval = Einhorn::State.config[:seconds] * (1.5 ** Einhorn::State.consecutive_deaths_before_ack)
  seconds_ago = (Time.now - Einhorn::State.last_spinup).to_f

  if seconds_ago > spinup_interval
    msg = "Last spinup was #{seconds_ago}s ago, and spinup_interval is #{spinup_interval}s, so spinning up a new process"

    if Einhorn::State.consecutive_deaths_before_ack > 0
      Einhorn.log_info("#{msg} (there have been #{Einhorn::State.consecutive_deaths_before_ack} consecutive unacked worker deaths)")
    else
      Einhorn.log_debug(msg)
    end

    spinup
  else
    Einhorn.log_debug("Last spinup was #{seconds_ago}s ago, and spinup_interval is #{spinup_interval}s, so not spinning up a new process")
  end

  Einhorn::TransientState.has_outstanding_spinup_timer = true
  Einhorn::Event::Timer.open(spinup_interval) do
    Einhorn::TransientState.has_outstanding_spinup_timer = false
    replenish
  end
end

.replenish_immediatelyObject



347
348
349
350
351
352
353
354
355
# File 'lib/einhorn/command.rb', line 347

def self.replenish_immediately
  missing = Einhorn::WorkerPool.missing_worker_count
  if missing <= 0
    Einhorn.log_error("Missing is currently #{missing.inspect}, but should always be > 0 when replenish_immediately is called. This probably indicates a bug in Einhorn.")
    return
  end
  Einhorn.log_info("Launching #{missing} new workers")
  missing.times {spinup}
end

.signal_all(signal, children = nil, record = true) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/einhorn/command.rb', line 101

def self.signal_all(signal, children=nil, record=true)
  children ||= Einhorn::WorkerPool.workers

  signaled = []
  Einhorn.log_info("Sending #{signal} to #{children.inspect}")

  children.each do |child|
    unless spec = Einhorn::State.children[child]
      Einhorn.log_error("Trying to send #{signal} to dead child #{child.inspect}. The fact we tried this probably indicates a bug in Einhorn.")
      next
    end

    if record
      if spec[:signaled].include?(signal)
        Einhorn.log_error("Re-sending #{signal} to already-signaled child #{child.inspect}. It may be slow to spin down, or it may be swallowing #{signal}s.")
      end
      spec[:signaled].add(signal)
    end

    begin
      Process.kill(signal, child)
    rescue Errno::ESRCH
    else
      signaled << child
    end
  end

  "Successfully sent #{signal}s to #{signaled.length} processes: #{signaled.inspect}"
end

.spinup(cmd = nil) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/einhorn/command.rb', line 208

def self.spinup(cmd=nil)
  cmd ||= Einhorn::State.cmd
  if Einhorn::TransientState.preloaded
    pid = fork do
      Einhorn::TransientState.whatami = :worker
      prepare_child_process

      Einhorn.log_info('About to tear down Einhorn state and run einhorn_main')
      Einhorn::Command::Interface.uninit
      Einhorn::Event.close_all_for_worker
      Einhorn.set_argv(cmd, true)

      prepare_child_environment
      einhorn_main
    end
  else
    pid = fork do
      Einhorn::TransientState.whatami = :worker
      prepare_child_process

      Einhorn.log_info("About to exec #{cmd.inspect}")
      # Here's the only case where cloexec would help. Since we
      # have to track and manually close FDs for other cases, we
      # may as well just reuse close_all rather than also set
      # cloexec on everything.
      Einhorn::Event.close_all_for_worker

      prepare_child_environment
      exec [cmd[0], cmd[0]], *cmd[1..-1]
    end
  end

  Einhorn.log_info("===> Launched #{pid}")
  Einhorn::State.children[pid] = {
    :type => :worker,
    :version => Einhorn::State.version,
    :acked => false,
    :signaled => Set.new
  }
  Einhorn::State.last_spinup = Time.now

  # Set up whatever's needed for ACKing
  ack_mode = Einhorn::State.ack_mode
  case type = ack_mode[:type]
  when :timer
    Einhorn::Event::ACKTimer.open(ack_mode[:timeout], pid)
  when :manual
  else
    Einhorn.log_error("Unrecognized ACK mode #{type.inspect}")
  end
end

.upgrade_workersObject



297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/einhorn/command.rb', line 297

def self.upgrade_workers
  if Einhorn::State.upgrading
    Einhorn.log_info("Currently upgrading (#{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} ACKs; bumping version and starting over)...")
  else
    Einhorn::State.upgrading = true
    Einhorn.log_info("Starting upgrade to #{Einhorn::State.version}...")
  end

  # Reset this, since we've just upgraded to a new universe (I'm
  # not positive this is the right behavior, but it's not
  # obviously wrong.)
  Einhorn::State.consecutive_deaths_before_ack = 0
  Einhorn::State.last_upgraded = Time.now

  Einhorn::State.version += 1
  replenish_immediately
end