Class: Qless::Workers::ForkingWorker

Inherits:
BaseWorker show all
Defined in:
lib/qless/worker/forking.rb

Instance Attribute Summary collapse

Attributes inherited from BaseWorker

#interval, #options, #output, #paused, #reserver, #sighup_handler

Instance Method Summary collapse

Methods inherited from BaseWorker

#deregister, #fail_job, #jobs, #listen_for_lost_lock, #log_level, #on_current_job_lock_lost, #pause, #perform, #procline, #safe_trap, #shutdown, #try_complete, #uniq_clients, #unpause

Methods included from BaseWorker::SupportsMiddlewareModules

#after_fork, #around_perform

Constructor Details

#initialize(reserver, options = {}) ⇒ ForkingWorker

Returns a new instance of ForkingWorker.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/qless/worker/forking.rb', line 15

def initialize(reserver, options = {})
  super(reserver, options)
  # The keys are the child PIDs, the values are information about the
  # worker, including its sandbox directory. This directory currently
  # isn't used, but this sets up for having that eventually.
  @sandboxes = {}

  # Save our options for starting children
  @options = options

  # The max interval between when children start (reduces thundering herd)
  @max_startup_interval = options[:max_startup_interval] || 10.0

  # TODO: facter to figure out how many cores we have
  @num_workers = options[:num_workers] || 1

  # All the modules that have been applied to this worker
  @modules = []

  @sandbox_mutex = Mutex.new
  # A queue of blocks that are postponed since we cannot get
  # @sandbox_mutex in trap handler
  @postponed_actions_queue = ::Queue.new
end

Instance Attribute Details

#max_startup_intervalObject

The child startup interval



13
14
15
# File 'lib/qless/worker/forking.rb', line 13

def max_startup_interval
  @max_startup_interval
end

Instance Method Details

#childrenObject

Returns a list of each of the child pids



139
140
141
# File 'lib/qless/worker/forking.rb', line 139

def children
  @sandboxes.keys
end

#contention_aware_handler(&block) ⇒ Object

If @sandbox_mutex is free, execute block immediately. Otherwise, postpone it until handling is possible



60
61
62
63
64
65
66
67
# File 'lib/qless/worker/forking.rb', line 60

def contention_aware_handler(&block)
  if @sandbox_mutex.try_lock
    block.call
    @sandbox_mutex.unlock
  else
    @postponed_actions_queue << block
  end
end

#extend(mod) ⇒ Object

Because we spawn a new worker, we need to apply all the modules that extend this one



42
43
44
45
# File 'lib/qless/worker/forking.rb', line 42

def extend(mod)
  @modules << mod
  super(mod)
end

#process_postponed_actionsObject

Process any signals (such as TERM) that could not be processed immediately due to @sandbox_mutex being in use



71
72
73
74
75
76
77
78
79
80
81
# File 'lib/qless/worker/forking.rb', line 71

def process_postponed_actions
  until @postponed_actions_queue.empty?
    # It's possible a signal interrupteed us between the empty?
    # and shift calls, but it could have only added more things
    # into @postponed_actions_queue
    block = @postponed_actions_queue.shift(true)
    @sandbox_mutex.synchronize do
      block.call
    end
  end
end

#register_signal_handlersObject

Register our handling of signals



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/qless/worker/forking.rb', line 84

def register_signal_handlers
  # If we're the parent process, we mostly want to forward the signals on
  # to the child processes. It's just that sometimes we want to wait for
  # them and then exit
  trap('TERM') do
    contention_aware_handler { stop!('TERM', in_signal_handler=true); exit }
  end
  trap('INT') do
    contention_aware_handler { stop!('INT', in_signal_handler=true); exit }
  end
  safe_trap('HUP') { sighup_handler.call }
  safe_trap('QUIT') do
    contention_aware_handler { stop!('QUIT', in_signal_handler=true); exit }
  end
  safe_trap('USR1') do
    contention_aware_handler { stop!('KILL', in_signal_handler=true) }
  end
  begin
    trap('CONT') { stop('CONT', in_signal_handler=true) }
    trap('USR2') { stop('USR2', in_signal_handler=true) }
  rescue ArgumentError
    warn 'Signals USR2, and/or CONT not supported.'
  end
end

#runObject

Run this worker



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/qless/worker/forking.rb', line 110

def run
  startup_sandboxes

  # Now keep an eye on our child processes, spawn replacements as needed
  loop do
    begin
      # Don't wait on any processes if we're already in shutdown mode.
      break if @shutdown

      # Wait for any child to kick the bucket
      pid, status = Process.wait2
      code, sig = status.exitstatus, status.stopsig
      log((code == 0 ? :info : :warn),
        "Worker process #{pid} died with #{code} from signal (#{sig})")

      # allow our shutdown logic (called from a separate thread) to take affect.
      break if @shutdown

      spawn_replacement_child(pid)
      process_postponed_actions
    rescue SystemCallError => e
      log(:error, "Failed to wait for child process: #{e.inspect}")
      # If we're shutting down, the loop above will exit
      exit! unless @shutdown
    end
  end
end

#spawnObject

Spawn a new child worker



48
49
50
51
52
53
54
55
56
# File 'lib/qless/worker/forking.rb', line 48

def spawn
  worker = SerialWorker.new(reserver, @options)
  # We use 11 as the exit status so that it is something unique
  # (rather than the common 1). Plus, 11 looks a little like
  # ll (i.e. "Lock Lost").
  worker.on_current_job_lock_lost { |job| exit!(11) }
  @modules.each { |mod| worker.extend(mod) }
  worker
end

#stop(signal = 'QUIT', in_signal_handler = true) ⇒ Object

Signal all the children



144
145
146
147
148
149
150
151
152
153
# File 'lib/qless/worker/forking.rb', line 144

def stop(signal = 'QUIT', in_signal_handler=true)
  log(:warn, "Sending #{signal} to children") unless in_signal_handler
  children.each do |pid|
    begin
      Process.kill(signal, pid)
    rescue Errno::ESRCH
      # no such process -- means the process has already died.
    end
  end
end

#stop!(signal = 'QUIT', in_signal_handler = true) ⇒ Object

Signal all the children and wait for them to exit. Should only be called when we have the lock on @sandbox_mutex



157
158
159
160
# File 'lib/qless/worker/forking.rb', line 157

def stop!(signal = 'QUIT', in_signal_handler=true)
  shutdown(in_signal_handler=in_signal_handler)
  shutdown_sandboxes(signal, in_signal_handler=in_signal_handler)
end