Class: Pitchfork::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/pitchfork/worker.rb

Overview

This class and its members can be considered a stable interface and will not change in a backwards-incompatible fashion between releases of pitchfork. Knowledge of this class is generally not needed for most users of pitchfork.

Some users may want to access it in the after_worker_fork/after_mold_fork hooks. See the Pitchfork::Configurator RDoc for examples.

Direct Known Subclasses

Service

Constant Summary collapse

EXIT_SIGNALS =

:stopdoc:

[:QUIT, :TERM]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(nr, pid: nil, generation: 0) ⇒ Worker

Returns a new instance of Worker.



19
20
21
22
23
24
25
26
27
28
# File 'lib/pitchfork/worker.rb', line 19

def initialize(nr, pid: nil, generation: 0)
  @nr = nr
  @pid = pid
  @generation = generation
  @mold = false
  @to_io = @monitor = nil
  @exiting = false
  @requests_count = 0
  init_state
end

Instance Attribute Details

#generationObject

Returns the value of attribute generation.



16
17
18
# File 'lib/pitchfork/worker.rb', line 16

def generation
  @generation
end

#monitorObject

Returns the value of attribute monitor.



17
18
19
# File 'lib/pitchfork/worker.rb', line 17

def monitor
  @monitor
end

#nrObject

Returns the value of attribute nr.



16
17
18
# File 'lib/pitchfork/worker.rb', line 16

def nr
  @nr
end

#pidObject

Returns the value of attribute pid.



16
17
18
# File 'lib/pitchfork/worker.rb', line 16

def pid
  @pid
end

#requests_countObject (readonly)

Returns the value of attribute requests_count.



17
18
19
# File 'lib/pitchfork/worker.rb', line 17

def requests_count
  @requests_count
end

Instance Method Details

#==(other) ⇒ Object

worker objects may be compared to just plain Integers



187
188
189
# File 'lib/pitchfork/worker.rb', line 187

def ==(other) # :nodoc:
  super || (!@nr.nil? && @nr == other)
end

#accept_nonblock(exception: nil) ⇒ Object

this only runs when the Rack app.call is not running act like a listener



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/pitchfork/worker.rb', line 164

def accept_nonblock(exception: nil) # :nodoc:
  loop do
    case buf = @to_io.recvmsg_nonblock(exception: false)
    when :wait_readable # keep waiting
      return false
    when nil # EOF monitor died, but we are at a safe place to exit
      fake_sig(:QUIT)
      return false
    when Message::SoftKill
      # trigger the signal handler
      fake_sig(buf.signum)
      # keep looping, more signals may be queued
    when Message
      return buf
    else
      raise TypeError, "Unexpected recvmsg_nonblock returns: #{buf.inspect}"
    end
  end # loop, as multiple signals may be sent
rescue Errno::ECONNRESET
  nil
end

#after_fork_in_childObject



235
236
237
# File 'lib/pitchfork/worker.rb', line 235

def after_fork_in_child
  @monitor&.close
end

#closeObject

called in both the monitor (reaping worker) and worker (SIGQUIT handler)



225
226
227
228
229
# File 'lib/pitchfork/worker.rb', line 225

def close # :nodoc:
  self.deadline = 0
  @monitor.close if @monitor
  @to_io.close if @to_io
end

#create_socketpair!Object



231
232
233
# File 'lib/pitchfork/worker.rb', line 231

def create_socketpair!
  @to_io, @monitor = Info.keep_ios(Pitchfork.socketpair)
end

#deadlineObject

called in the monitor process



212
213
214
# File 'lib/pitchfork/worker.rb', line 212

def deadline # :nodoc:
  @state_drop.deadline
end

#deadline=(value) ⇒ Object

called in the worker process



204
205
206
207
208
209
# File 'lib/pitchfork/worker.rb', line 204

def deadline=(value) # :nodoc:
  # If we are (re)setting to zero mark worker as not ready.
  self.ready = false if value == 0

  @state_drop.deadline = value
end

#exiting?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/pitchfork/worker.rb', line 30

def exiting?
  @exiting
end

#fake_sig(sig) ⇒ Object

call a signal handler immediately without triggering EINTR We do not use the more obvious Process.kill(sig, $$) here since that signal delivery may be deferred. We want to avoid signal delivery while the Rack app.call is running because some database drivers (e.g. ruby-pg) may cancel pending requests.



139
140
141
142
143
144
# File 'lib/pitchfork/worker.rb', line 139

def fake_sig(sig) # :nodoc:
  old_cb = trap(sig, "IGNORE")
  old_cb.call
ensure
  trap(sig, old_cb)
end

#finish_promotion(control_socket) ⇒ Object



69
70
71
72
73
74
# File 'lib/pitchfork/worker.rb', line 69

def finish_promotion(control_socket)
  SharedMemory.current_generation = @generation
  message = Message::MoldReady.new(@nr, @pid, generation)
  control_socket.sendmsg(message)
  @state_drop = SharedMemory.mold_state
end

#hard_kill(sig) ⇒ Object



158
159
160
# File 'lib/pitchfork/worker.rb', line 158

def hard_kill(sig)
  Process.kill(sig, pid)
end

#increment_requests_count(by = 1) ⇒ Object



220
221
222
# File 'lib/pitchfork/worker.rb', line 220

def increment_requests_count(by = 1)
  @requests_count += by
end

#mold?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/pitchfork/worker.rb', line 114

def mold?
  @mold
end

#notify_ready(control_socket) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/pitchfork/worker.rb', line 76

def notify_ready(control_socket)
  self.ready = true
  message = if worker?
    Message::WorkerReady.new(@nr, @pid, @generation)
  elsif service?
    Message::ServiceReady.new(@pid, @generation)
  else
    raise "Unexpected child type"
  end

  control_socket.sendmsg(message)
end

#outdated?Boolean

Returns:

  • (Boolean)


38
39
40
# File 'lib/pitchfork/worker.rb', line 38

def outdated?
  SharedMemory.current_generation > @generation
end

#pending?Boolean

Returns:

  • (Boolean)


34
35
36
# File 'lib/pitchfork/worker.rb', line 34

def pending?
  @monitor.nil?
end

#promote(generation) ⇒ Object



89
90
91
# File 'lib/pitchfork/worker.rb', line 89

def promote(generation)
  send_message_nonblock(Message::PromoteWorker.new(generation))
end

#promote!(timeout) ⇒ Object



101
102
103
104
# File 'lib/pitchfork/worker.rb', line 101

def promote!(timeout)
  @generation += 1
  promoted!(timeout)
end

#promoted!(timeout) ⇒ Object



106
107
108
109
110
111
112
# File 'lib/pitchfork/worker.rb', line 106

def promoted!(timeout)
  @mold = true
  @nr = nil
  @state_drop = SharedMemory.mold_promotion_state
  update_deadline(timeout) if timeout
  self
end

#ready=(bool) ⇒ Object



195
196
197
# File 'lib/pitchfork/worker.rb', line 195

def ready=(bool)
  @state_drop.ready = bool
end

#ready?Boolean

Returns:

  • (Boolean)


191
192
193
# File 'lib/pitchfork/worker.rb', line 191

def ready?
  @state_drop.ready?
end

#register_to_monitor(control_socket) ⇒ Object



55
56
57
58
59
60
# File 'lib/pitchfork/worker.rb', line 55

def register_to_monitor(control_socket)
  create_socketpair!
  message = Message::WorkerSpawned.new(@nr, @pid, generation, @monitor)
  control_socket.sendmsg(message)
  @monitor.close
end

#resetObject



216
217
218
# File 'lib/pitchfork/worker.rb', line 216

def reset
  @requests_count = 0
end

#service?Boolean

Returns:

  • (Boolean)


118
119
120
# File 'lib/pitchfork/worker.rb', line 118

def service?
  false
end

#soft_kill(sig) ⇒ Object

monitor sends fake signals to children



147
148
149
150
151
152
153
154
155
156
# File 'lib/pitchfork/worker.rb', line 147

def soft_kill(sig) # :nodoc:
  signum = Signal.list[sig.to_s] or raise ArgumentError, "BUG: bad signal: #{sig.inspect}"

  # Do not care in the odd case the buffer is full, here.
  success = send_message_nonblock(Message::SoftKill.new(signum))
  if success && EXIT_SIGNALS.include?(sig)
    @exiting = true
  end
  success
end

#spawn_service(_new_service) ⇒ Object



97
98
99
# File 'lib/pitchfork/worker.rb', line 97

def spawn_service(_new_service)
  send_message_nonblock(Message::SpawnService.new)
end

#spawn_worker(new_worker) ⇒ Object



93
94
95
# File 'lib/pitchfork/worker.rb', line 93

def spawn_worker(new_worker)
  send_message_nonblock(Message::SpawnWorker.new(new_worker.nr))
end

#start_promotion(control_socket) ⇒ Object



62
63
64
65
66
67
# File 'lib/pitchfork/worker.rb', line 62

def start_promotion(control_socket)
  create_socketpair!
  message = Message::MoldSpawned.new(@nr, @pid, generation, @monitor)
  control_socket.sendmsg(message)
  @monitor.close
end

#to_ioObject

IO.select-compatible



126
127
128
# File 'lib/pitchfork/worker.rb', line 126

def to_io # IO.select-compatible
  @to_io.to_io
end

#to_logObject



239
240
241
242
243
244
245
# File 'lib/pitchfork/worker.rb', line 239

def to_log
  if mold?
    pid ? "mold gen=#{generation} pid=#{pid}" : "mold gen=#{generation}"
  else
    pid ? "worker=#{nr} gen=#{generation} pid=#{pid}" : "worker=#{nr} gen=#{generation}"
  end
end

#update(message) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/pitchfork/worker.rb', line 42

def update(message)
  message.class.members.each do |member|
    send("#{member}=", message.public_send(member))
  end

  case message
  when Message::MoldSpawned
    @state_drop = SharedMemory.mold_promotion_state
  when Message::MoldReady
    @state_drop = SharedMemory.mold_state
  end
end

#update_deadline(timeout) ⇒ Object



199
200
201
# File 'lib/pitchfork/worker.rb', line 199

def update_deadline(timeout)
  self.deadline = Pitchfork.time_now(true) + timeout
end

#worker?Boolean

Returns:

  • (Boolean)


122
123
124
# File 'lib/pitchfork/worker.rb', line 122

def worker?
  !mold? && !service?
end