Class: Divvy::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/divvy/worker.rb

Overview

Models an individual divvy worker process. These objects are used in both the master and the forked off workers to perform common tasks and for basic tracking.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task, number, socket, verbose = false) ⇒ Worker

Create a Worker object. The Master object typically handles this.



23
24
25
26
27
28
29
30
# File 'lib/divvy/worker.rb', line 23

def initialize(task, number, socket, verbose = false)
  @task = task
  @number = number
  @socket = socket
  @verbose = verbose
  @pid = nil
  @status = nil
end

Instance Attribute Details

#numberObject (readonly)

The worker number. These are sequential starting from 1 and ending in the configured worker concurrency count.



8
9
10
# File 'lib/divvy/worker.rb', line 8

def number
  @number
end

#pidObject

The worker processes’s pid. This is $$ when inside the worker process.



20
21
22
# File 'lib/divvy/worker.rb', line 20

def pid
  @pid
end

#socketObject (readonly)

The Unix domain socket file used to communicate with the master process.



11
12
13
# File 'lib/divvy/worker.rb', line 11

def socket
  @socket
end

#statusObject (readonly)

Process::Status object result of reaping the worker.



17
18
19
# File 'lib/divvy/worker.rb', line 17

def status
  @status
end

#verboseObject

Whether verbose log info should be written to stderr.



14
15
16
# File 'lib/divvy/worker.rb', line 14

def verbose
  @verbose
end

Instance Method Details

#dequeueObject

Internal: Retrieve an individual task item from the master process. Opens a new socket, reads and unmarshals a single task item.

Returns an Array containing the arguments yielded by the dispatcher.



122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/divvy/worker.rb', line 122

def dequeue
  client = UNIXSocket.new(@socket)
  r, w, e = IO.select([client], nil, [client], nil)
  return if !e.empty?

  if data = client.read(16384)
    Marshal.load(data)
  end
rescue Errno::ENOENT => boom
  # socket file went away, bail out
ensure
  client.close if client
end

#install_signal_trapsObject



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/divvy/worker.rb', line 136

def install_signal_traps
  fail "attempt to install worker signal handling in master" if !worker_process?

  %w[INT TERM QUIT].each do |signal|
    trap signal do
      next if @shutdown
      @shutdown = true
      log "#{signal} received. initiating graceful shutdown..."
    end
  end
end

#kill(signal) ⇒ Object

Public: Send a signal to a running worker process.

signal - String signal name.

Returns true when the process was signaled, false if the process is no longer running. Raises when the worker process is not thought to be running.



45
46
47
48
49
50
51
52
# File 'lib/divvy/worker.rb', line 45

def kill(signal)
  fail "worker not running" if @pid.nil?
  log "sending signal #{signal}"
  Process.kill(signal, @pid)
  true
rescue Errno::ESRCH
  false
end

#log(message) ⇒ Object



148
149
150
151
# File 'lib/divvy/worker.rb', line 148

def log(message)
  return if !verbose
  $stderr.printf("worker [%d]: %s\n", number, message)
end

#mainObject

Internal: The main worker loop. This is called after a new worker process has been setup with signal traps and whatnot and connects to the master in a loop to retrieve task items. The worker process exits if this method returns or raises an exception.



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/divvy/worker.rb', line 101

def main
  fail "Worker#main called in master process" if !worker_process?

  log "booted with pid #{@pid}"
  @task.after_fork(self)

  while arguments = dequeue
    @task.process(*arguments)
    break if @shutdown
  end

  # worker should exit on return
rescue Exception => boom
  warn "error: worker [#{number}]: #{boom.class} #{boom.to_s}"
  exit 1
end

#reapObject

Public: Attempt to reap this worker’s process using waitpid. This is a no-op if the process is not thought to be running or is marked as already being reaped. This should only be called in the master process.

Returns the Process::Status object if the process was reaped, nil if the process was not reaped because it’s still running or is already reaped.



89
90
91
92
93
94
95
# File 'lib/divvy/worker.rb', line 89

def reap
  if @status.nil? && @pid && Process::waitpid(@pid, Process::WNOHANG)
    @status = $?
    log "exited, reaped pid #{@pid} (status: #{@status.exitstatus})"
    @status
  end
end

#running?Boolean

Public: Check whether the worker process is thought to be running. This does not attempt to verify the real state of the process with the system.

Returns:

  • (Boolean)


34
35
36
# File 'lib/divvy/worker.rb', line 34

def running?
  @pid && @status.nil?
end

#spawnObject

Public: Fork off a new process for this worker and yield to the block immediately in the new child process.

Returns the pid of the new process in the master process. Never returns in the child process. Raises when the worker process is already thought to be running or has not yet been reaped.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/divvy/worker.rb', line 68

def spawn
  fail "worker already running" if running?
  @status = nil

  if (@pid = fork).nil?
    @pid = $$
    yield
    install_signal_traps
    main
    exit 0
  end

  @pid
end

#worker_process?Boolean

Public: Check whether the current process is this worker process.

Returns true when we’re in this worker, false in the master.

Returns:

  • (Boolean)


57
58
59
# File 'lib/divvy/worker.rb', line 57

def worker_process?
  @pid == $$
end