Class: Qless::Pool
- Inherits:
-
Object
show all
- Extended by:
- Logging
- Includes:
- Logging
- Defined in:
- lib/qless/pool.rb,
lib/qless/pool/cli.rb,
lib/qless/pool/logging.rb,
lib/qless/pool/version.rb,
lib/qless/pool/pooled_worker.rb
Defined Under Namespace
Modules: CLI, Logging, PooledWorker
Classes: QuitNowException
Constant Summary
collapse
- SIG_QUEUE_MAX_SIZE =
5
- DEFAULT_WORKER_INTERVAL =
5
- QUEUE_SIGS =
[ :QUIT, :INT, :TERM, :USR1, :USR2, :CONT, :HUP, :WINCH, ]
- CHUNK_SIZE =
(16 * 1024)
- VERSION =
"0.5.2"
Class Attribute Summary collapse
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Logging
app, log, log_worker, procline, reopen_logs!
Constructor Details
#initialize(config) ⇒ Pool
Returns a new instance of Pool.
24
25
26
27
28
|
# File 'lib/qless/pool.rb', line 24
def initialize(config)
init_config(config)
@workers = Hash.new { |workers, queues| workers[queues] = {} }
procline "(initialized)"
end
|
Class Attribute Details
.app_name ⇒ Object
Returns the value of attribute app_name.
68
69
70
|
# File 'lib/qless/pool.rb', line 68
def app_name
@app_name
end
|
.config_files ⇒ Object
Returns the value of attribute config_files.
68
69
70
|
# File 'lib/qless/pool.rb', line 68
def config_files
@config_files
end
|
.term_behavior ⇒ Object
Returns the value of attribute term_behavior.
217
218
219
|
# File 'lib/qless/pool.rb', line 217
def term_behavior
@term_behavior
end
|
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
21
22
23
|
# File 'lib/qless/pool.rb', line 21
def config
@config
end
|
#workers ⇒ Object
Returns the value of attribute workers.
22
23
24
|
# File 'lib/qless/pool.rb', line 22
def workers
@workers
end
|
Class Method Details
.after_prefork(&block) ⇒ Object
The ‘after_prefork` hook will be run in workers if you are using the preforking master worker to save memory. Use this hook to reload database connections and so forth to ensure that they’re not shared among workers.
Call with a block to set the hook. Call with no arguments to return the hook.
51
52
53
|
# File 'lib/qless/pool.rb', line 51
def self.after_prefork(&block)
block ? (@after_prefork = block) : @after_prefork
end
|
.after_prefork=(after_prefork) ⇒ Object
Set the after_prefork proc.
56
57
58
|
# File 'lib/qless/pool.rb', line 56
def self.after_prefork=(after_prefork)
@after_prefork = after_prefork
end
|
.choose_config_file ⇒ Object
81
82
83
84
85
86
87
|
# File 'lib/qless/pool.rb', line 81
def self.choose_config_file
if ENV["QLESS_POOL_CONFIG"]
ENV["QLESS_POOL_CONFIG"]
else
@config_files.detect { |f| File.exist?(f) }
end
end
|
.handle_winch=(bool) ⇒ Object
77
78
79
|
# File 'lib/qless/pool.rb', line 77
def self.handle_winch=(bool)
@handle_winch = bool
end
|
.handle_winch? ⇒ Boolean
74
75
76
|
# File 'lib/qless/pool.rb', line 74
def self.handle_winch?
@handle_winch ||= false
end
|
.pool_factory ⇒ Object
30
31
32
|
# File 'lib/qless/pool.rb', line 30
def self.pool_factory
@pool_factory ||= Qless::PoolFactory.new
end
|
.pool_factory=(factory) ⇒ Object
38
39
40
|
# File 'lib/qless/pool.rb', line 38
def self.pool_factory=(factory)
@pool_factory = factory
end
|
.run ⇒ Object
89
90
91
92
93
94
|
# File 'lib/qless/pool.rb', line 89
def self.run
if GC.respond_to?(:copy_on_write_friendly=)
GC.copy_on_write_friendly = true
end
Qless::Pool.new(choose_config_file).start.join
end
|
Instance Method Details
#all_known_queues ⇒ Object
340
341
342
|
# File 'lib/qless/pool.rb', line 340
def all_known_queues
config.keys | workers.keys
end
|
#all_pids ⇒ Object
319
320
321
|
# File 'lib/qless/pool.rb', line 319
def all_pids
workers.map {|q,workers| workers.keys }.flatten
end
|
#awaken_master ⇒ Object
151
152
153
154
155
156
157
158
|
# File 'lib/qless/pool.rb', line 151
def awaken_master
begin
self_pipe.last.write_nonblock('.') rescue Errno::EAGAIN, Errno::EINTR
retry
end
end
|
#call_after_prefork! ⇒ Object
60
61
62
|
# File 'lib/qless/pool.rb', line 60
def call_after_prefork!
self.class.after_prefork && self.class.after_prefork.call
end
|
#config_file ⇒ Object
}}} Config: load config and config file {{{
373
374
375
|
# File 'lib/qless/pool.rb', line 373
def configured_worker_count
config.values.inject {|sum,x| sum + x }
end
|
#create_worker(queues) ⇒ Object
408
409
410
|
# File 'lib/qless/pool.rb', line 408
def create_worker(queues)
pool_factory.worker(queues)
end
|
#delete_worker(pid) ⇒ Object
TODO: close any file descriptors connected to worker, if any
311
312
313
314
315
316
317
|
# File 'lib/qless/pool.rb', line 311
def delete_worker(pid)
worker = nil
workers.detect do |queues, pid_to_worker|
worker = pid_to_worker.delete(pid)
end
worker
end
|
#environment ⇒ Object
123
124
125
126
127
128
129
130
131
|
# File 'lib/qless/pool.rb', line 123
def environment
if defined?(Rails) && Rails.respond_to?(:env)
Rails.env
elsif defined? RAILS_ENV
RAILS_ENV
else
ENV['RACK_ENV'] || ENV['RAILS_ENV'] || ENV['QLESS_ENV']
end
end
|
#graceful_worker_shutdown!(signal) ⇒ Object
227
228
229
230
231
|
# File 'lib/qless/pool.rb', line 227
def graceful_worker_shutdown!(signal)
log "#{signal}: immediate shutdown (graceful worker shutdown)"
signal_all_workers(:QUIT)
:break
end
|
#graceful_worker_shutdown_and_wait!(signal) ⇒ Object
220
221
222
223
224
225
|
# File 'lib/qless/pool.rb', line 220
def graceful_worker_shutdown_and_wait!(signal)
log "#{signal}: graceful shutdown, waiting for children"
signal_all_workers(:QUIT)
reap_all_workers(0) :break
end
|
#handle_sig_queue! ⇒ Object
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
|
# File 'lib/qless/pool.rb', line 181
def handle_sig_queue!
case signal = sig_queue.shift
when :USR1, :USR2, :CONT
log "#{signal}: sending to all workers"
signal_all_workers(signal)
when :HUP
log "HUP: reload config file and reload logfiles"
load_config
Logging.reopen_logs!
log "HUP: gracefully shutdown old children (which have old logfiles open)"
signal_all_workers(:QUIT)
log "HUP: new children will inherit new logfiles"
maintain_worker_count
when :WINCH
if self.class.handle_winch?
log "WINCH: gracefully stopping all workers"
@config = {}
maintain_worker_count
end
when :QUIT
graceful_worker_shutdown_and_wait!(signal)
when :INT
graceful_worker_shutdown!(signal)
when :TERM
case self.class.term_behavior
when "graceful_worker_shutdown_and_wait"
graceful_worker_shutdown_and_wait!(signal)
when "graceful_worker_shutdown"
graceful_worker_shutdown!(signal)
else
shutdown_everything_now!(signal)
end
end
end
|
#init_config(config) ⇒ Object
103
104
105
106
107
108
109
110
111
|
# File 'lib/qless/pool.rb', line 103
def init_config(config)
case config
when String, nil
@config_file = config
else
@config = config.dup
end
load_config
end
|
#init_self_pipe! ⇒ Object
140
141
142
143
144
|
# File 'lib/qless/pool.rb', line 140
def init_self_pipe!
self_pipe.each { |io| io.close rescue nil }
self_pipe.replace(IO.pipe)
self_pipe.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
end
|
#init_sig_handlers! ⇒ Object
146
147
148
149
|
# File 'lib/qless/pool.rb', line 146
def init_sig_handlers!
QUEUE_SIGS.each { |sig| trap_deferred(sig) }
trap(:CHLD) { |_| awaken_master }
end
|
#join ⇒ Object
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
|
# File 'lib/qless/pool.rb', line 261
def join
loop do
reap_all_workers
break if handle_sig_queue! == :break
if sig_queue.empty?
master_sleep
maintain_worker_count
end
procline("managing #{all_pids.inspect}")
end
procline("(shutting down)")
log "manager finished"
end
|
#load_config ⇒ Object
113
114
115
116
117
118
119
120
121
|
# File 'lib/qless/pool.rb', line 113
def load_config
if config_file
@config = YAML.load(ERB.new(IO.read(config_file)).result)
else
@config ||= {}
end
environment and @config[environment] and config.merge!(@config[environment])
config.delete_if {|key, value| value.is_a? Hash }
end
|
#maintain_worker_count ⇒ Object
}}} ???: maintain_worker_count, all_known_queues {{{
332
333
334
335
336
337
338
|
# File 'lib/qless/pool.rb', line 332
def maintain_worker_count
all_known_queues.each do |queues|
delta = worker_delta_for(queues)
spawn_missing_workers_for(queues, delta) if delta > 0
quit_excess_workers_for(queues, delta.abs) if delta < 0
end
end
|
#master_sleep ⇒ Object
277
278
279
280
281
282
283
284
|
# File 'lib/qless/pool.rb', line 277
def master_sleep
begin
ready = IO.select([self_pipe.first], nil, nil, 1) or return
ready.first && ready.first.first or return
loop { self_pipe.first.read_nonblock(CHUNK_SIZE) }
rescue Errno::EAGAIN, Errno::EINTR
end
end
|
#pids_for(queues) ⇒ Object
383
384
385
|
# File 'lib/qless/pool.rb', line 383
def pids_for(queues)
workers[queues].keys
end
|
#pool_factory ⇒ Object
34
35
36
|
# File 'lib/qless/pool.rb', line 34
def pool_factory
self.class.pool_factory
end
|
#quit_excess_workers_for(queues, delta) ⇒ Object
352
353
354
355
356
|
# File 'lib/qless/pool.rb', line 352
def quit_excess_workers_for(queues, delta)
pids_for(queues)[0...delta].each do |pid|
Process.kill("QUIT", pid)
end
end
|
#reap_all_workers(waitpid_flags = Process::WNOHANG) ⇒ Object
}}} worker process management {{{
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
|
# File 'lib/qless/pool.rb', line 289
def reap_all_workers(waitpid_flags=Process::WNOHANG)
@waiting_for_reaper = waitpid_flags == 0
begin
loop do
wpid, status = Process.waitpid2(-1, waitpid_flags)
break unless wpid
if worker = delete_worker(wpid)
log "Reaped qless worker[#{status.pid}] (status: #{status.exitstatus}) queues: #{worker.job_reserver.queues.collect(&:name).join(",")}"
else
log "Tried to reap worker [#{status.pid}], but it had already died. (status: #{status.exitstatus})"
end
end
rescue Errno::EINTR
retry
rescue Errno::ECHILD, QuitNowException
end
end
|
#report_worker_pool_pids ⇒ Object
253
254
255
256
257
258
259
|
# File 'lib/qless/pool.rb', line 253
def report_worker_pool_pids
if workers.empty?
log "Pool is empty"
else
log "Pool contains worker PIDs: #{all_pids.inspect}"
end
end
|
#reset_sig_handlers! ⇒ Object
177
178
179
|
# File 'lib/qless/pool.rb', line 177
def reset_sig_handlers!
QUEUE_SIGS.each {|sig| trap(sig, "DEFAULT") }
end
|
#running_worker_count ⇒ Object
use qless to get a number for currently running workers on a machine so we don’t double up after a restart with long running jobs still active
361
362
363
364
365
366
367
368
369
370
371
|
# File 'lib/qless/pool.rb', line 361
def running_worker_count
count = 0
machine_hostname = Socket.gethostname
worker_info = pool_factory.client.workers.counts
worker_info.each do |worker|
hostname, pid = worker['name'].split('-')
count += 1 if machine_hostname == hostname
end
count
end
|
#self_pipe ⇒ Object
Sig handlers and self pipe management {{{
137
|
# File 'lib/qless/pool.rb', line 137
def self_pipe; @self_pipe ||= [] end
|
#shutdown_everything_now!(signal) ⇒ Object
233
234
235
236
237
|
# File 'lib/qless/pool.rb', line 233
def shutdown_everything_now!(signal)
log "#{signal}: immediate shutdown (and immediate worker shutdown)"
signal_all_workers(:TERM)
:break
end
|
#sig_queue ⇒ Object
138
|
# File 'lib/qless/pool.rb', line 138
def sig_queue; @sig_queue ||= [] end
|
#signal_all_workers(signal) ⇒ Object
323
324
325
326
327
|
# File 'lib/qless/pool.rb', line 323
def signal_all_workers(signal)
all_pids.each do |pid|
Process.kill signal, pid
end
end
|
#spawn_missing_workers_for(queues, delta) ⇒ Object
}}} methods that operate on a single grouping of queues {{{ perhaps this means a class is waiting to be extracted
348
349
350
|
# File 'lib/qless/pool.rb', line 348
def spawn_missing_workers_for(queues, delta)
delta.times { spawn_worker!(queues) }
end
|
#spawn_worker!(queues) ⇒ Object
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
|
# File 'lib/qless/pool.rb', line 387
def spawn_worker!(queues)
worker = create_worker(queues)
pid = fork do
Qless.instance_variable_set(:@worker_name, nil)
pool_factory.client.redis.client.reconnect
log_worker "Starting worker #{worker}"
call_after_prefork!
reset_sig_handlers!
begin
worker.work((ENV['INTERVAL'] || DEFAULT_WORKER_INTERVAL).to_i) rescue Errno::EINTR
log "Caught interrupted system call Errno::EINTR. Retrying."
retry
end
end
workers[queues][pid] = worker
end
|
#start ⇒ Object
}}} start, join, and master sleep {{{
242
243
244
245
246
247
248
249
250
251
|
# File 'lib/qless/pool.rb', line 242
def start
procline("(starting)")
init_self_pipe!
init_sig_handlers!
maintain_worker_count
procline("(started)")
log "started manager"
report_worker_pool_pids
self
end
|
#trap_deferred(signal) ⇒ Object
defer a signal for later processing in #join (master process)
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
# File 'lib/qless/pool.rb', line 162
def trap_deferred(signal)
trap(signal) do |sig_nr|
if @waiting_for_reaper && [:INT, :TERM].include?(signal)
log "Recieved #{signal}: short circuiting QUIT waitpid"
raise QuitNowException
end
if sig_queue.size < SIG_QUEUE_MAX_SIZE
sig_queue << signal
awaken_master
else
log "ignoring SIG#{signal}, queue=#{sig_queue.inspect}"
end
end
end
|
#worker_delta_for(queues) ⇒ Object
377
378
379
380
381
|
# File 'lib/qless/pool.rb', line 377
def worker_delta_for(queues)
delta = config.fetch(queues, 0) - workers.fetch(queues, []).size
delta = 0 if delta > 0 && running_worker_count > configured_worker_count
delta
end
|