Class: Divvy::Master

Inherits:
Object
  • Object
show all
Defined in:
lib/divvy/master.rb

Overview

The master process used to generate and distribute task items to the worker processes.

Defined Under Namespace

Classes: BootFailure, Shutdown

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task, worker_count, verbose = false, socket = nil) ⇒ Master

Create the master process object.

task - Object that implements the Parallelizable interface. worker_count - Number of worker processes. verbose - Enable verbose error logging. socket - The unix domain socket filename.

The workers array is initialized with Worker objects for the number of worker processes requested. The processes are not actually started at this time though.



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/divvy/master.rb', line 49

def initialize(task, worker_count, verbose = false, socket = nil)
  @task = task
  @worker_count = worker_count
  @verbose = verbose
  @socket = socket || "/tmp/divvy-#{$$}-#{object_id}.sock"

  # stats
  @tasks_distributed = 0
  @failures = 0
  @spawn_count = 0

  # shutdown state
  @shutdown = false
  @graceful = true
  @reap = false

  # worker objects
  @workers = []
  (1..@worker_count).each do |worker_num|
    worker = Divvy::Worker.new(@task, worker_num, @socket, @verbose)
    workers << worker
  end
end

Instance Attribute Details

#failuresObject (readonly)

Number of worker processes that have exited with a failure status since the master started processing work.



24
25
26
# File 'lib/divvy/master.rb', line 24

def failures
  @failures
end

#socketObject (readonly)

The string filename of the unix domain socket used to distribute work.



14
15
16
# File 'lib/divvy/master.rb', line 14

def socket
  @socket
end

#spawn_countObject (readonly)

Number of worker processes that have been spawned since the master started processing work.



28
29
30
# File 'lib/divvy/master.rb', line 28

def spawn_count
  @spawn_count
end

#tasks_distributedObject (readonly)

Number of tasks that have been distributed to worker processes.



20
21
22
# File 'lib/divvy/master.rb', line 20

def tasks_distributed
  @tasks_distributed
end

#verboseObject

Enable verbose logging to stderr.



17
18
19
# File 'lib/divvy/master.rb', line 17

def verbose
  @verbose
end

#worker_countObject (readonly)

The number of worker processes to boot.



8
9
10
# File 'lib/divvy/master.rb', line 8

def worker_count
  @worker_count
end

#workersObject (readonly)

The array of Worker objects this master is managing.



11
12
13
# File 'lib/divvy/master.rb', line 11

def workers
  @workers
end

Instance Method Details

#boot_worker(worker) ⇒ Object

Internal: Boot and individual worker process. Don’t call this if the worker is thought to be running.

worker - The Worker object to boot.

Returns the Worker object provided.



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/divvy/master.rb', line 212

def boot_worker(worker)
  fail "worker #{worker.number} already running" if worker.running?
  fail "attempt to boot worker without server" if !@server

  @task.before_fork(worker)

  worker.spawn do
    reset_signal_traps
    @workers = nil

    @server.close
    @server = nil

    $stdin.close
  end
  @spawn_count += 1

  worker
end

#boot_workersObject

Internal: Boot any workers that are not currently running. This is a no-op if all workers are though to be running. No attempt is made to verify worker processes are running here. Only workers that have not yet been booted and those previously marked as reaped are started.



199
200
201
202
203
204
# File 'lib/divvy/master.rb', line 199

def boot_workers
  workers.each do |worker|
    next if worker.running?
    boot_worker(worker)
  end
end

#install_signal_trapsObject

Internal: Install traps for shutdown signals. Most signals deal with shutting down the master loop and socket.

INFO - Dump stack for all processes to stderr. TERM - Initiate immediate forceful shutdown of all worker processes

along with the master process, aborting any existing jobs in
progress.

INT, QUIT - Initiate graceful shutdown, allowing existing worker processes

to finish their current task and exit on their own. If this
signal is received again after 10s, instead initiate an
immediate forceful shutdown as with TERM. This is mostly so you
can interrupt sanely with Ctrl+C with the master foregrounded.

CHLD - Set the worker reap flag. An attempt is made to reap workers

immediately after the current dispatch iteration.

Returns nothing.



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/divvy/master.rb', line 276

def install_signal_traps
  @traps =
    %w[INT QUIT].map do |signal|
      Signal.trap signal do
        if @shutdown
          raise Shutdown, "SIG#{signal}" if (Time.now - @shutdown) > 10 # seconds
          next
        else
          shutdown
          log "#{signal} received. initiating graceful shutdown..."
        end
      end
    end
  @traps << Signal.trap("CHLD") { @reap = true }
  @traps << Signal.trap("TERM") { raise Shutdown, "SIGTERM" }

  Signal.trap "INFO" do
    message = "==> info: process #$$ dumping stack\n"
    message << caller.join("\n").gsub(/^/, "    ").gsub("#{Dir.pwd}/", "")
    $stderr.puts(message)
  end

  @traps
end

#kill_workers(signal = 'TERM') ⇒ Object

Internal: Send a signal to all running workers.

signal - The string signal name.

Returns nothing.



237
238
239
240
241
242
# File 'lib/divvy/master.rb', line 237

def kill_workers(signal = 'TERM')
  workers.each do |worker|
    next if !worker.running?
    worker.kill(signal)
  end
end

#log(message) ⇒ Object

Internal: Write a verbose log message to stderr.



318
319
320
321
# File 'lib/divvy/master.rb', line 318

def log(message)
  return if !verbose
  $stderr.printf("master: %s\n", message)
end

#master_process?Boolean

Public: Check if the current process is the master process.

Returns true in the master process, false in the worker process.

Returns:

  • (Boolean)


135
136
137
# File 'lib/divvy/master.rb', line 135

def master_process?
  @workers
end

#reap_workersObject

Internal: Attempt to reap all worker processes via Process::waitpid. This method does not block waiting for processes to exit. Running processes are ignored.

Returns an array of Worker objects whose process’s were reaped. The Worker#status attribute can be used to access the Process::Status result.



250
251
252
253
254
255
256
257
258
# File 'lib/divvy/master.rb', line 250

def reap_workers
  @reap = false
  workers.select do |worker|
    if status = worker.reap
      @failures += 1 if !status.success?
      worker
    end
  end
end

#reset_signal_trapsObject

Internal: Uninstall signal traps set up by the install_signal_traps method. This is called immediately after forking worker processes to reset traps to their default implementations and also when the master process shuts down.



305
306
307
308
309
310
311
312
313
314
315
# File 'lib/divvy/master.rb', line 305

def reset_signal_traps
  return if @traps.nil? || @traps.empty?
  %w[INT QUIT CHLD TERM].each do |signal|
    handler = @traps.shift || "DEFAULT"
    if handler.is_a?(String)
      Signal.trap(signal, handler)
    else
      Signal.trap(signal, &handler)
    end
  end
end

#runObject

Public: Start the main run loop. This installs signal handlers into the current process, binds to the unix domain socket, boots workers, and begins dispatching work.

The run method does not return until all task items generated have been processed unless a shutdown signal is received or the #shutdown method is called within the loop.

Returns nothing. Raises BootFailure when the workers fail to start. Raises Shutdown when a forceful shutdown is triggered (SIGTERM).



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/divvy/master.rb', line 84

def run
  fail "Already running!!!" if @server
  fail "Attempt to run master in a worker process" if worker_process?
  install_signal_traps
  start_server

  @task.dispatch do |*task_item|
    # boot workers that haven't started yet or have been reaped
    boot_workers

    # check for shutdown or worker reap flag until a connection is pending
    # in the domain socket queue. bail out if workers exited before even
    # requesting a task item.
    while IO.select([@server], nil, nil, 0.010).nil?
      break if @shutdown
      if @reap
        reap_workers
        if !workers_running? && @tasks_distributed == 0
          raise BootFailure, "Worker processes failed to boot."
        else
          boot_workers
        end
      end
    end
    break if @shutdown

    # at this point there should be at least one connection pending.
    begin
      data = Marshal.dump(task_item)
      sock = @server.accept
      sock.write(data)
    ensure
      sock.close if sock
    end
    @tasks_distributed += 1

    break if @shutdown
    reap_workers if @reap
  end

  nil
rescue Shutdown
  @graceful = false
  @shutdown = true
ensure
  shutdown! if master_process?
end

#shutdownObject

Public: Initiate shutdown of the run loop. The loop will not be stopped when this method returns. The original run loop will return after the current iteration of task item.



156
157
158
# File 'lib/divvy/master.rb', line 156

def shutdown
  @shutdown ||= Time.now
end

#shutdown!Object

Internal: Really shutdown the unix socket and reap all worker processes. This doesn’t signal the workers. Instead, the socket shutdown is relied upon to trigger the workers to exit normally.

TODO Send SIGKILL when workers stay running for configurable period.

Raises:



165
166
167
168
169
170
171
172
173
174
175
# File 'lib/divvy/master.rb', line 165

def shutdown!
  fail "Master#shutdown! called in worker process" if worker_process?
  stop_server
  while workers_running?
    kill_workers("KILL") if !@graceful
    reaped = reap_workers
    sleep 0.010 if reaped.empty?
  end
  reset_signal_traps
  raise Shutdown if !@graceful
end

#start_serverObject

Internal: create and bind to the unix domain socket. Note that the requested backlog matches the number of workers. Otherwise workers will get ECONNREFUSED when attempting to connect to the master and exit.



180
181
182
183
184
185
# File 'lib/divvy/master.rb', line 180

def start_server
  fail "Master#start_server called in worker process" if worker_process?
  File.unlink(@socket) if File.exist?(@socket)
  @server = UNIXServer.new(@socket)
  @server.listen(worker_count)
end

#stop_serverObject

Internal: Close and remove the unix domain socket.



188
189
190
191
192
193
# File 'lib/divvy/master.rb', line 188

def stop_server
  fail "Master#stop_server called in worker process" if worker_process?
  File.unlink(@socket) if File.exist?(@socket)
  @server.close if @server
  @server = nil
end

#worker_process?Boolean

Public: Check if the current process is a worker process. This relies on the @workers array being set to a nil value.

Returns true in the worker process, false in master processes.

Returns:

  • (Boolean)


143
144
145
# File 'lib/divvy/master.rb', line 143

def worker_process?
  !master_process?
end

#workers_running?Boolean

Public: Are any worker processes currently running or have yet to be reaped by the master process?

Returns:

  • (Boolean)


149
150
151
# File 'lib/divvy/master.rb', line 149

def workers_running?
  @workers.any? { |worker| worker.running? }
end