Class: Reqless::Workers::ForkingWorker

Inherits:
BaseWorker
  • Object
show all
Defined in:
lib/reqless/worker/forking.rb

Instance Attribute Summary collapse

Attributes inherited from BaseWorker

#interval, #options, #output, #paused, #reserver, #sighup_handler

Instance Method Summary collapse

Methods inherited from BaseWorker

#deregister, #fail_job, #jobs, #listen_for_lost_lock, #log_level, #on_current_job_lock_lost, #pause, #perform, #procline, #safe_trap, #shutdown, #try_complete, #uniq_clients, #unpause

Methods included from BaseWorker::SupportsMiddlewareModules

#after_fork, #around_perform

Constructor Details

#initialize(reserver, options = {}) ⇒ ForkingWorker

Returns a new instance of ForkingWorker.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/reqless/worker/forking.rb', line 14

def initialize(reserver, options = {})
  super(reserver, options)
  # The keys are the child PIDs, the values are information about the
  # worker, including its sandbox directory. This directory currently
  # isn't used, but this sets up for having that eventually.
  @sandboxes = {}

  # Save our options for starting children
  @options = options

  # The max interval between when children start (reduces thundering herd)
  @max_startup_interval = options[:max_startup_interval] || 10.0

  # TODO: facter to figure out how many cores we have
  @num_workers = options[:num_workers] || 1

  # All the modules that have been applied to this worker
  @modules = []

  @sandbox_mutex = Mutex.new
end

Instance Attribute Details

#max_startup_intervalObject

The child startup interval



12
13
14
# File 'lib/reqless/worker/forking.rb', line 12

def max_startup_interval
  @max_startup_interval
end

Instance Method Details

#childrenObject

Returns a list of each of the child pids



113
114
115
# File 'lib/reqless/worker/forking.rb', line 113

def children
  @sandboxes.keys
end

#extend(mod) ⇒ Object

Because we spawn a new worker, we need to apply all the modules that extend this one



38
39
40
41
# File 'lib/reqless/worker/forking.rb', line 38

def extend(mod)
  @modules << mod
  super(mod)
end

#register_signal_handlersObject

Register our handling of signals



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/reqless/worker/forking.rb', line 55

def register_signal_handlers
  # If we're the parent process, we mostly want to forward the signals on
  # to the child processes. It's just that sometimes we want to wait for
  # them and then exit
  trap('TERM') do
    stop!('TERM')
    exit
  end

  trap('INT') do
    stop!('TERM')
    exit
  end

  safe_trap('HUP') { sighup_handler.call }
  safe_trap('QUIT') do
    stop!('QUIT')
    exit
  end
  safe_trap('USR1') { stop!('KILL') }

  begin
    trap('CONT') { stop('CONT') }
    trap('USR2') { stop('USR2') }
  rescue ArgumentError
    warn 'Signals USR2, and/or CONT not supported.'
  end
end

#runObject

Run this worker



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/reqless/worker/forking.rb', line 85

def run
  startup_sandboxes

  # Now keep an eye on our child processes, spawn replacements as needed
  loop do
    begin
      # Don't wait on any processes if we're already in shutdown mode.
      break if @shutdown

      # Wait for any child to kick the bucket
      pid, status = Process.wait2
      code, sig = status.exitstatus, status.stopsig
      log(:warn,
        "Worker process #{pid} died with #{code} from signal (#{sig})")

      # allow our shutdown logic (called from a separate thread) to take affect.
      break if @shutdown

      spawn_replacement_child(pid)
    rescue SystemCallError => e
      log(:error, "Failed to wait for child process: #{e.inspect}")
      # If we're shutting down, the loop above will exit
      exit! unless @shutdown
    end
  end
end

#spawnObject

Spawn a new child worker



44
45
46
47
48
49
50
51
52
# File 'lib/reqless/worker/forking.rb', line 44

def spawn
  worker = SerialWorker.new(reserver, @options)
  # We use 11 as the exit status so that it is something unique
  # (rather than the common 1). Plus, 11 looks a little like
  # ll (i.e. "Lock Lost").
  worker.on_current_job_lock_lost { |job| exit!(11) }
  @modules.each { |mod| worker.extend(mod) }
  worker
end

#stop(signal = 'QUIT') ⇒ Object

Signal all the children



118
119
120
121
122
123
124
125
126
127
# File 'lib/reqless/worker/forking.rb', line 118

def stop(signal = 'QUIT')
  log(:warn, "Sending #{signal} to children")
  children.each do |pid|
    begin
      Process.kill(signal, pid)
    rescue Errno::ESRCH
      # no such process -- means the process has already died.
    end
  end
end

#stop!(signal = 'QUIT') ⇒ Object

Signal all the children and wait for them to exit



130
131
132
133
# File 'lib/reqless/worker/forking.rb', line 130

def stop!(signal = 'QUIT')
  shutdown
  shutdown_sandboxes(signal)
end