Class: Qless::Pool

Inherits:
Object
  • Object
show all
Extended by:
Logging
Includes:
Logging
Defined in:
lib/qless/pool.rb,
lib/qless/pool/cli.rb,
lib/qless/pool/logging.rb,
lib/qless/pool/version.rb,
lib/qless/pool/pooled_worker.rb

Defined Under Namespace

Modules: CLI, Logging, PooledWorker Classes: QuitNowException

Constant Summary collapse

SIG_QUEUE_MAX_SIZE =
5
DEFAULT_WORKER_INTERVAL =
5
QUEUE_SIGS =
[ :QUIT, :INT, :TERM, :USR1, :USR2, :CONT, :HUP, :WINCH, ]
CHUNK_SIZE =
(16 * 1024)
VERSION =
"0.5.2"

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

app, log, log_worker, procline, reopen_logs!

Constructor Details

#initialize(config) ⇒ Pool

Returns a new instance of Pool.



24
25
26
27
28
# File 'lib/qless/pool.rb', line 24

def initialize(config)
  init_config(config)
  @workers = Hash.new { |workers, queues| workers[queues] = {} }
  procline "(initialized)"
end

Class Attribute Details

.app_nameObject

Returns the value of attribute app_name.



68
69
70
# File 'lib/qless/pool.rb', line 68

def app_name
  @app_name
end

.config_filesObject

Returns the value of attribute config_files.



68
69
70
# File 'lib/qless/pool.rb', line 68

def config_files
  @config_files
end

.term_behaviorObject

Returns the value of attribute term_behavior.



217
218
219
# File 'lib/qless/pool.rb', line 217

def term_behavior
  @term_behavior
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



21
22
23
# File 'lib/qless/pool.rb', line 21

def config
  @config
end

#workersObject (readonly)

Returns the value of attribute workers.



22
23
24
# File 'lib/qless/pool.rb', line 22

def workers
  @workers
end

Class Method Details

.after_prefork(&block) ⇒ Object

The ‘after_prefork` hook will be run in workers if you are using the preforking master worker to save memory. Use this hook to reload database connections and so forth to ensure that they’re not shared among workers.

Call with a block to set the hook. Call with no arguments to return the hook.



51
52
53
# File 'lib/qless/pool.rb', line 51

def self.after_prefork(&block)
  block ? (@after_prefork = block) : @after_prefork
end

.after_prefork=(after_prefork) ⇒ Object

Set the after_prefork proc.



56
57
58
# File 'lib/qless/pool.rb', line 56

def self.after_prefork=(after_prefork)
  @after_prefork = after_prefork
end

.choose_config_fileObject



81
82
83
84
85
86
87
# File 'lib/qless/pool.rb', line 81

def self.choose_config_file
  if ENV["QLESS_POOL_CONFIG"]
    ENV["QLESS_POOL_CONFIG"]
  else
    @config_files.detect { |f| File.exist?(f) }
  end
end

.handle_winch=(bool) ⇒ Object



77
78
79
# File 'lib/qless/pool.rb', line 77

def self.handle_winch=(bool)
  @handle_winch = bool
end

.handle_winch?Boolean

Returns:

  • (Boolean)


74
75
76
# File 'lib/qless/pool.rb', line 74

def self.handle_winch?
  @handle_winch ||= false
end

.pool_factoryObject



30
31
32
# File 'lib/qless/pool.rb', line 30

def self.pool_factory
  @pool_factory ||= Qless::PoolFactory.new
end

.pool_factory=(factory) ⇒ Object



38
39
40
# File 'lib/qless/pool.rb', line 38

def self.pool_factory=(factory)
  @pool_factory = factory
end

.runObject



89
90
91
92
93
94
# File 'lib/qless/pool.rb', line 89

def self.run
  if GC.respond_to?(:copy_on_write_friendly=)
    GC.copy_on_write_friendly = true
  end
  Qless::Pool.new(choose_config_file).start.join
end

Instance Method Details

#all_known_queuesObject



340
341
342
# File 'lib/qless/pool.rb', line 340

def all_known_queues
  config.keys | workers.keys
end

#all_pidsObject



319
320
321
# File 'lib/qless/pool.rb', line 319

def all_pids
  workers.map {|q,workers| workers.keys }.flatten
end

#awaken_masterObject



151
152
153
154
155
156
157
158
# File 'lib/qless/pool.rb', line 151

def awaken_master
  begin
    self_pipe.last.write_nonblock('.') # wakeup master process from select
  rescue Errno::EAGAIN, Errno::EINTR
    # pipe is full, master should wake up anyways
    retry
  end
end

#call_after_prefork!Object



60
61
62
# File 'lib/qless/pool.rb', line 60

def call_after_prefork!
  self.class.after_prefork && self.class.after_prefork.call
end

#config_fileObject

}}} Config: load config and config file {{{



99
100
101
# File 'lib/qless/pool.rb', line 99

def config_file
  @config_file || (!@config && ::Qless::Pool.choose_config_file)
end

#configured_worker_countObject



373
374
375
# File 'lib/qless/pool.rb', line 373

def configured_worker_count
  config.values.inject {|sum,x| sum + x }
end

#create_worker(queues) ⇒ Object



408
409
410
# File 'lib/qless/pool.rb', line 408

def create_worker(queues)
  pool_factory.worker(queues)
end

#delete_worker(pid) ⇒ Object

TODO: close any file descriptors connected to worker, if any



311
312
313
314
315
316
317
# File 'lib/qless/pool.rb', line 311

def delete_worker(pid)
  worker = nil
  workers.detect do |queues, pid_to_worker|
    worker = pid_to_worker.delete(pid)
  end
  worker
end

#environmentObject



123
124
125
126
127
128
129
130
131
# File 'lib/qless/pool.rb', line 123

def environment
  if defined?(Rails) && Rails.respond_to?(:env)
    Rails.env
  elsif defined? RAILS_ENV
    RAILS_ENV
  else
    ENV['RACK_ENV'] || ENV['RAILS_ENV'] || ENV['QLESS_ENV']
  end
end

#graceful_worker_shutdown!(signal) ⇒ Object



227
228
229
230
231
# File 'lib/qless/pool.rb', line 227

def graceful_worker_shutdown!(signal)
  log "#{signal}: immediate shutdown (graceful worker shutdown)"
  signal_all_workers(:QUIT)
  :break
end

#graceful_worker_shutdown_and_wait!(signal) ⇒ Object



220
221
222
223
224
225
# File 'lib/qless/pool.rb', line 220

def graceful_worker_shutdown_and_wait!(signal)
  log "#{signal}: graceful shutdown, waiting for children"
  signal_all_workers(:QUIT)
  reap_all_workers(0) # will hang until all workers are shutdown
  :break
end

#handle_sig_queue!Object



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/qless/pool.rb', line 181

def handle_sig_queue!
  case signal = sig_queue.shift
  when :USR1, :USR2, :CONT
    log "#{signal}: sending to all workers"
    signal_all_workers(signal)
  when :HUP
    log "HUP: reload config file and reload logfiles"
    load_config
    Logging.reopen_logs!
    log "HUP: gracefully shutdown old children (which have old logfiles open)"
    signal_all_workers(:QUIT)
    log "HUP: new children will inherit new logfiles"
    maintain_worker_count
  when :WINCH
    if self.class.handle_winch?
      log "WINCH: gracefully stopping all workers"
      @config = {}
      maintain_worker_count
    end
  when :QUIT
    graceful_worker_shutdown_and_wait!(signal)
  when :INT
    graceful_worker_shutdown!(signal)
  when :TERM
    case self.class.term_behavior
    when "graceful_worker_shutdown_and_wait"
      graceful_worker_shutdown_and_wait!(signal)
    when "graceful_worker_shutdown"
      graceful_worker_shutdown!(signal)
    else
      shutdown_everything_now!(signal)
    end
  end
end

#init_config(config) ⇒ Object



103
104
105
106
107
108
109
110
111
# File 'lib/qless/pool.rb', line 103

def init_config(config)
  case config
  when String, nil
    @config_file = config
  else
    @config = config.dup
  end
  load_config
end

#init_self_pipe!Object



140
141
142
143
144
# File 'lib/qless/pool.rb', line 140

def init_self_pipe!
  self_pipe.each { |io| io.close rescue nil }
  self_pipe.replace(IO.pipe)
  self_pipe.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
end

#init_sig_handlers!Object



146
147
148
149
# File 'lib/qless/pool.rb', line 146

def init_sig_handlers!
  QUEUE_SIGS.each { |sig| trap_deferred(sig) }
  trap(:CHLD)     { |_| awaken_master }
end

#joinObject



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/qless/pool.rb', line 261

def join
  loop do
    reap_all_workers
    break if handle_sig_queue! == :break
    if sig_queue.empty?
      master_sleep
      maintain_worker_count
    end
    procline("managing #{all_pids.inspect}")
  end
  procline("(shutting down)")
  #stop # gracefully shutdown all workers on our way out
  log "manager finished"
  #unlink_pid_safe(pid) if pid
end

#load_configObject



113
114
115
116
117
118
119
120
121
# File 'lib/qless/pool.rb', line 113

def load_config
  if config_file
    @config = YAML.load(ERB.new(IO.read(config_file)).result)
  else
    @config ||= {}
  end
  environment and @config[environment] and config.merge!(@config[environment])
  config.delete_if {|key, value| value.is_a? Hash }
end

#maintain_worker_countObject

}}} ???: maintain_worker_count, all_known_queues {{{



332
333
334
335
336
337
338
# File 'lib/qless/pool.rb', line 332

def maintain_worker_count
  all_known_queues.each do |queues|
    delta = worker_delta_for(queues)
    spawn_missing_workers_for(queues, delta) if delta > 0
    quit_excess_workers_for(queues, delta.abs)   if delta < 0
  end
end

#master_sleepObject



277
278
279
280
281
282
283
284
# File 'lib/qless/pool.rb', line 277

def master_sleep
  begin
    ready = IO.select([self_pipe.first], nil, nil, 1) or return
    ready.first && ready.first.first or return
    loop { self_pipe.first.read_nonblock(CHUNK_SIZE) }
  rescue Errno::EAGAIN, Errno::EINTR
  end
end

#pids_for(queues) ⇒ Object



383
384
385
# File 'lib/qless/pool.rb', line 383

def pids_for(queues)
  workers[queues].keys
end

#pool_factoryObject



34
35
36
# File 'lib/qless/pool.rb', line 34

def pool_factory
  self.class.pool_factory
end

#quit_excess_workers_for(queues, delta) ⇒ Object



352
353
354
355
356
# File 'lib/qless/pool.rb', line 352

def quit_excess_workers_for(queues, delta)
  pids_for(queues)[0...delta].each do |pid|
    Process.kill("QUIT", pid)
  end
end

#reap_all_workers(waitpid_flags = Process::WNOHANG) ⇒ Object

}}} worker process management {{{



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/qless/pool.rb', line 289

def reap_all_workers(waitpid_flags=Process::WNOHANG)
  @waiting_for_reaper = waitpid_flags == 0
  begin
    loop do
      # -1, wait for any child process
      wpid, status = Process.waitpid2(-1, waitpid_flags)
      break unless wpid

      if worker = delete_worker(wpid)
        log "Reaped qless worker[#{status.pid}] (status: #{status.exitstatus}) queues: #{worker.job_reserver.queues.collect(&:name).join(",")}"
      else
        # this died before it could be killed, so it's not going to have any extra info
        log "Tried to reap worker [#{status.pid}], but it had already died. (status: #{status.exitstatus})"
      end
    end
  rescue Errno::EINTR
    retry
  rescue Errno::ECHILD, QuitNowException
  end
end

#report_worker_pool_pidsObject



253
254
255
256
257
258
259
# File 'lib/qless/pool.rb', line 253

def report_worker_pool_pids
  if workers.empty?
    log "Pool is empty"
  else
    log "Pool contains worker PIDs: #{all_pids.inspect}"
  end
end

#reset_sig_handlers!Object



177
178
179
# File 'lib/qless/pool.rb', line 177

def reset_sig_handlers!
  QUEUE_SIGS.each {|sig| trap(sig, "DEFAULT") }
end

#running_worker_countObject

use qless to get a number for currently running workers on a machine so we don’t double up after a restart with long running jobs still active



361
362
363
364
365
366
367
368
369
370
371
# File 'lib/qless/pool.rb', line 361

def running_worker_count
  # may want to do a zcard on ql:workers instead
  count = 0
  machine_hostname = Socket.gethostname
  worker_info = pool_factory.client.workers.counts
  worker_info.each do |worker|
    hostname, pid = worker['name'].split('-')
    count += 1 if machine_hostname == hostname
  end
  count
end

#self_pipeObject

Sig handlers and self pipe management {{{



137
# File 'lib/qless/pool.rb', line 137

def self_pipe; @self_pipe ||= [] end

#shutdown_everything_now!(signal) ⇒ Object



233
234
235
236
237
# File 'lib/qless/pool.rb', line 233

def shutdown_everything_now!(signal)
  log "#{signal}: immediate shutdown (and immediate worker shutdown)"
  signal_all_workers(:TERM)
  :break
end

#sig_queueObject



138
# File 'lib/qless/pool.rb', line 138

def sig_queue; @sig_queue ||= [] end

#signal_all_workers(signal) ⇒ Object



323
324
325
326
327
# File 'lib/qless/pool.rb', line 323

def signal_all_workers(signal)
  all_pids.each do |pid|
    Process.kill signal, pid
  end
end

#spawn_missing_workers_for(queues, delta) ⇒ Object

}}} methods that operate on a single grouping of queues {{{ perhaps this means a class is waiting to be extracted



348
349
350
# File 'lib/qless/pool.rb', line 348

def spawn_missing_workers_for(queues, delta)
  delta.times { spawn_worker!(queues) }
end

#spawn_worker!(queues) ⇒ Object



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# File 'lib/qless/pool.rb', line 387

def spawn_worker!(queues)
  worker = create_worker(queues)
  pid = fork do
    # This var gets cached, so need to clear it out in forks
    # so that workers report the correct name to qless
    Qless.instance_variable_set(:@worker_name, nil)
    pool_factory.client.redis.client.reconnect
    log_worker "Starting worker #{worker}"
    call_after_prefork!
    reset_sig_handlers!
    #self_pipe.each {|io| io.close }
    begin
      worker.work((ENV['INTERVAL'] || DEFAULT_WORKER_INTERVAL).to_i) # interval, will block
    rescue Errno::EINTR
      log "Caught interrupted system call Errno::EINTR. Retrying."
      retry
    end
  end
  workers[queues][pid] = worker
end

#startObject

}}} start, join, and master sleep {{{



242
243
244
245
246
247
248
249
250
251
# File 'lib/qless/pool.rb', line 242

def start
  procline("(starting)")
  init_self_pipe!
  init_sig_handlers!
  maintain_worker_count
  procline("(started)")
  log "started manager"
  report_worker_pool_pids
  self
end

#trap_deferred(signal) ⇒ Object

defer a signal for later processing in #join (master process)



162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/qless/pool.rb', line 162

def trap_deferred(signal)
  trap(signal) do |sig_nr|
    if @waiting_for_reaper && [:INT, :TERM].include?(signal)
      log "Recieved #{signal}: short circuiting QUIT waitpid"
      raise QuitNowException
    end
    if sig_queue.size < SIG_QUEUE_MAX_SIZE
      sig_queue << signal
      awaken_master
    else
      log "ignoring SIG#{signal}, queue=#{sig_queue.inspect}"
    end
  end
end

#worker_delta_for(queues) ⇒ Object



377
378
379
380
381
# File 'lib/qless/pool.rb', line 377

def worker_delta_for(queues)
  delta = config.fetch(queues, 0) - workers.fetch(queues, []).size
  delta = 0 if delta > 0 && running_worker_count > configured_worker_count
  delta
end