Class: Qless::Workers::ForkingWorker
- Inherits:
-
BaseWorker
- Object
- BaseWorker
- Qless::Workers::ForkingWorker
- Defined in:
- lib/qless/worker/forking.rb
Instance Attribute Summary collapse
-
#max_startup_interval ⇒ Object
The child startup interval.
Attributes inherited from BaseWorker
#interval, #options, #output, #paused, #reserver, #sighup_handler
Instance Method Summary collapse
-
#children ⇒ Object
Returns a list of each of the child pids.
-
#contention_aware_handler(&block) ⇒ Object
If @sandbox_mutex is free, execute block immediately.
-
#extend(mod) ⇒ Object
Because we spawn a new worker, we need to apply all the modules that extend this one.
-
#initialize(reserver, options = {}) ⇒ ForkingWorker
constructor
A new instance of ForkingWorker.
-
#process_postponed_actions ⇒ Object
Process any signals (such as TERM) that could not be processed immediately due to @sandbox_mutex being in use.
-
#register_signal_handlers ⇒ Object
Register our handling of signals.
-
#run ⇒ Object
Run this worker.
-
#spawn ⇒ Object
Spawn a new child worker.
-
#stop(signal = 'QUIT', in_signal_handler = true) ⇒ Object
Signal all the children.
-
#stop!(signal = 'QUIT', in_signal_handler = true) ⇒ Object
Signal all the children and wait for them to exit.
Methods inherited from BaseWorker
#deregister, #fail_job, #jobs, #listen_for_lost_lock, #log_level, #on_current_job_lock_lost, #pause, #perform, #procline, #safe_trap, #shutdown, #try_complete, #uniq_clients, #unpause
Methods included from BaseWorker::SupportsMiddlewareModules
Constructor Details
#initialize(reserver, options = {}) ⇒ ForkingWorker
Returns a new instance of ForkingWorker.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/qless/worker/forking.rb', line 15 def initialize(reserver, = {}) super(reserver, ) # The keys are the child PIDs, the values are information about the # worker, including its sandbox directory. This directory currently # isn't used, but this sets up for having that eventually. @sandboxes = {} # Save our options for starting children @options = # The max interval between when children start (reduces thundering herd) @max_startup_interval = [:max_startup_interval] || 10.0 # TODO: facter to figure out how many cores we have @num_workers = [:num_workers] || 1 # All the modules that have been applied to this worker @modules = [] @sandbox_mutex = Mutex.new # A queue of blocks that are postponed since we cannot get # @sandbox_mutex in trap handler @postponed_actions_queue = ::Queue.new end |
Instance Attribute Details
#max_startup_interval ⇒ Object
The child startup interval
13 14 15 |
# File 'lib/qless/worker/forking.rb', line 13 def max_startup_interval @max_startup_interval end |
Instance Method Details
#children ⇒ Object
Returns a list of each of the child pids
139 140 141 |
# File 'lib/qless/worker/forking.rb', line 139 def children @sandboxes.keys end |
#contention_aware_handler(&block) ⇒ Object
If @sandbox_mutex is free, execute block immediately. Otherwise, postpone it until handling is possible
60 61 62 63 64 65 66 67 |
# File 'lib/qless/worker/forking.rb', line 60 def contention_aware_handler(&block) if @sandbox_mutex.try_lock block.call @sandbox_mutex.unlock else @postponed_actions_queue << block end end |
#extend(mod) ⇒ Object
Because we spawn a new worker, we need to apply all the modules that extend this one
42 43 44 45 |
# File 'lib/qless/worker/forking.rb', line 42 def extend(mod) @modules << mod super(mod) end |
#process_postponed_actions ⇒ Object
Process any signals (such as TERM) that could not be processed immediately due to @sandbox_mutex being in use
71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/qless/worker/forking.rb', line 71 def process_postponed_actions until @postponed_actions_queue.empty? # It's possible a signal interrupteed us between the empty? # and shift calls, but it could have only added more things # into @postponed_actions_queue block = @postponed_actions_queue.shift(true) @sandbox_mutex.synchronize do block.call end end end |
#register_signal_handlers ⇒ Object
Register our handling of signals
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/qless/worker/forking.rb', line 84 def register_signal_handlers # If we're the parent process, we mostly want to forward the signals on # to the child processes. It's just that sometimes we want to wait for # them and then exit trap('TERM') do contention_aware_handler { stop!('TERM', in_signal_handler=true); exit } end trap('INT') do contention_aware_handler { stop!('INT', in_signal_handler=true); exit } end safe_trap('HUP') { sighup_handler.call } safe_trap('QUIT') do contention_aware_handler { stop!('QUIT', in_signal_handler=true); exit } end safe_trap('USR1') do contention_aware_handler { stop!('KILL', in_signal_handler=true) } end begin trap('CONT') { stop('CONT', in_signal_handler=true) } trap('USR2') { stop('USR2', in_signal_handler=true) } rescue ArgumentError warn 'Signals USR2, and/or CONT not supported.' end end |
#run ⇒ Object
Run this worker
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/qless/worker/forking.rb', line 110 def run startup_sandboxes # Now keep an eye on our child processes, spawn replacements as needed loop do begin # Don't wait on any processes if we're already in shutdown mode. break if @shutdown # Wait for any child to kick the bucket pid, status = Process.wait2 code, sig = status.exitstatus, status.stopsig log((code == 0 ? :info : :warn), "Worker process #{pid} died with #{code} from signal (#{sig})") # allow our shutdown logic (called from a separate thread) to take affect. break if @shutdown spawn_replacement_child(pid) process_postponed_actions rescue SystemCallError => e log(:error, "Failed to wait for child process: #{e.inspect}") # If we're shutting down, the loop above will exit exit! unless @shutdown end end end |
#spawn ⇒ Object
Spawn a new child worker
48 49 50 51 52 53 54 55 56 |
# File 'lib/qless/worker/forking.rb', line 48 def spawn worker = SerialWorker.new(reserver, @options) # We use 11 as the exit status so that it is something unique # (rather than the common 1). Plus, 11 looks a little like # ll (i.e. "Lock Lost"). worker.on_current_job_lock_lost { |job| exit!(11) } @modules.each { |mod| worker.extend(mod) } worker end |
#stop(signal = 'QUIT', in_signal_handler = true) ⇒ Object
Signal all the children
144 145 146 147 148 149 150 151 152 153 |
# File 'lib/qless/worker/forking.rb', line 144 def stop(signal = 'QUIT', in_signal_handler=true) log(:warn, "Sending #{signal} to children") unless in_signal_handler children.each do |pid| begin Process.kill(signal, pid) rescue Errno::ESRCH # no such process -- means the process has already died. end end end |
#stop!(signal = 'QUIT', in_signal_handler = true) ⇒ Object
Signal all the children and wait for them to exit. Should only be called when we have the lock on @sandbox_mutex
157 158 159 160 |
# File 'lib/qless/worker/forking.rb', line 157 def stop!(signal = 'QUIT', in_signal_handler=true) shutdown(in_signal_handler=in_signal_handler) shutdown_sandboxes(signal, in_signal_handler=in_signal_handler) end |