Class: Reqless::Workers::ForkingWorker
- Inherits:
-
BaseWorker
- Object
- BaseWorker
- Reqless::Workers::ForkingWorker
- Defined in:
- lib/reqless/worker/forking.rb
Instance Attribute Summary collapse
-
#max_startup_interval ⇒ Object
The child startup interval.
Attributes inherited from BaseWorker
#interval, #options, #output, #paused, #reserver, #sighup_handler
Instance Method Summary collapse
-
#children ⇒ Object
Returns a list of each of the child pids.
-
#extend(mod) ⇒ Object
Because we spawn a new worker, we need to apply all the modules that extend this one.
-
#initialize(reserver, options = {}) ⇒ ForkingWorker
constructor
A new instance of ForkingWorker.
-
#register_signal_handlers ⇒ Object
Register our handling of signals.
-
#run ⇒ Object
Run this worker.
-
#spawn ⇒ Object
Spawn a new child worker.
-
#stop(signal = 'QUIT') ⇒ Object
Signal all the children.
-
#stop!(signal = 'QUIT') ⇒ Object
Signal all the children and wait for them to exit.
Methods inherited from BaseWorker
#deregister, #fail_job, #jobs, #listen_for_lost_lock, #log_level, #on_current_job_lock_lost, #pause, #perform, #procline, #safe_trap, #shutdown, #try_complete, #uniq_clients, #unpause
Methods included from BaseWorker::SupportsMiddlewareModules
Constructor Details
#initialize(reserver, options = {}) ⇒ ForkingWorker
Returns a new instance of ForkingWorker.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/reqless/worker/forking.rb', line 14 def initialize(reserver, = {}) super(reserver, ) # The keys are the child PIDs, the values are information about the # worker, including its sandbox directory. This directory currently # isn't used, but this sets up for having that eventually. @sandboxes = {} # Save our options for starting children @options = # The max interval between when children start (reduces thundering herd) @max_startup_interval = [:max_startup_interval] || 10.0 # TODO: facter to figure out how many cores we have @num_workers = [:num_workers] || 1 # All the modules that have been applied to this worker @modules = [] @sandbox_mutex = Mutex.new end |
Instance Attribute Details
#max_startup_interval ⇒ Object
The child startup interval
12 13 14 |
# File 'lib/reqless/worker/forking.rb', line 12 def max_startup_interval @max_startup_interval end |
Instance Method Details
#children ⇒ Object
Returns a list of each of the child pids
113 114 115 |
# File 'lib/reqless/worker/forking.rb', line 113 def children @sandboxes.keys end |
#extend(mod) ⇒ Object
Because we spawn a new worker, we need to apply all the modules that extend this one
38 39 40 41 |
# File 'lib/reqless/worker/forking.rb', line 38 def extend(mod) @modules << mod super(mod) end |
#register_signal_handlers ⇒ Object
Register our handling of signals
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/reqless/worker/forking.rb', line 55 def register_signal_handlers # If we're the parent process, we mostly want to forward the signals on # to the child processes. It's just that sometimes we want to wait for # them and then exit trap('TERM') do stop!('TERM') exit end trap('INT') do stop!('TERM') exit end safe_trap('HUP') { sighup_handler.call } safe_trap('QUIT') do stop!('QUIT') exit end safe_trap('USR1') { stop!('KILL') } begin trap('CONT') { stop('CONT') } trap('USR2') { stop('USR2') } rescue ArgumentError warn 'Signals USR2, and/or CONT not supported.' end end |
#run ⇒ Object
Run this worker
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/reqless/worker/forking.rb', line 85 def run startup_sandboxes # Now keep an eye on our child processes, spawn replacements as needed loop do begin # Don't wait on any processes if we're already in shutdown mode. break if @shutdown # Wait for any child to kick the bucket pid, status = Process.wait2 code, sig = status.exitstatus, status.stopsig log(:warn, "Worker process #{pid} died with #{code} from signal (#{sig})") # allow our shutdown logic (called from a separate thread) to take affect. break if @shutdown spawn_replacement_child(pid) rescue SystemCallError => e log(:error, "Failed to wait for child process: #{e.inspect}") # If we're shutting down, the loop above will exit exit! unless @shutdown end end end |
#spawn ⇒ Object
Spawn a new child worker
44 45 46 47 48 49 50 51 52 |
# File 'lib/reqless/worker/forking.rb', line 44 def spawn worker = SerialWorker.new(reserver, @options) # We use 11 as the exit status so that it is something unique # (rather than the common 1). Plus, 11 looks a little like # ll (i.e. "Lock Lost"). worker.on_current_job_lock_lost { |job| exit!(11) } @modules.each { |mod| worker.extend(mod) } worker end |
#stop(signal = 'QUIT') ⇒ Object
Signal all the children
118 119 120 121 122 123 124 125 126 127 |
# File 'lib/reqless/worker/forking.rb', line 118 def stop(signal = 'QUIT') log(:warn, "Sending #{signal} to children") children.each do |pid| begin Process.kill(signal, pid) rescue Errno::ESRCH # no such process -- means the process has already died. end end end |
#stop!(signal = 'QUIT') ⇒ Object
Signal all the children and wait for them to exit
130 131 132 133 |
# File 'lib/reqless/worker/forking.rb', line 130 def stop!(signal = 'QUIT') shutdown shutdown_sandboxes(signal) end |