Class: PreforkEngine

Inherits:
Object
  • Object
show all
Defined in:
lib/prefork_engine.rb,
lib/prefork_engine/version.rb

Constant Summary collapse

VERSION =
"0.0.7"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ PreforkEngine

Returns a new instance of PreforkEngine.


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/prefork_engine.rb', line 9

def initialize(options={})
  defaults = {
    "max_workers"          => 10,
    "spawn_interval"       => 0,
    "err_respawn_interval" => 1,
    "trap_signals"         => {
      "TERM" => "TERM"
    },
    "before_fork"          => nil,
    "after_fork"           => nil,
    "on_child_reap"        => nil,
  }
  @options = defaults.merge(options)
  @signal_received = ""
  @manager_pid = ""
  @generation = 0
  @_no_adjust_until = 0.0
  @in_child = false
  @worker_pids = {}
  @delayed_task = nil
  @options["trap_signals"].each do |k,kv|
    Signal.trap(k) do |signo|
      @signal_received = Signal.signame(signo)
    end
  end
  Signal.trap("CHLD") do
    #do nothing
  end
end

Instance Attribute Details

#manager_pidObject (readonly)

Returns the value of attribute manager_pid


7
8
9
# File 'lib/prefork_engine.rb', line 7

def manager_pid
  @manager_pid
end

#signal_receivedObject (readonly)

Returns the value of attribute signal_received


6
7
8
# File 'lib/prefork_engine.rb', line 6

def signal_received
  @signal_received
end

Instance Method Details

#_action_for(sig) ⇒ Object

_handle_delayed_task


144
145
146
147
148
149
# File 'lib/prefork_engine.rb', line 144

def _action_for(sig)
  return nil if !@options["trap_signals"].has_key?(sig)
  t = @options["trap_signals"][sig]
  t = [t,0] if !t.kind_of?(Enumerable)
  return t
end

#_decide_actionObject


124
125
126
127
# File 'lib/prefork_engine.rb', line 124

def _decide_action
  return 1 if self.num_workers < @options["max_workers"]
  return 0
end

#_handle_delayed_taskObject


135
136
137
138
139
140
141
142
# File 'lib/prefork_engine.rb', line 135

def _handle_delayed_task
  while true
    return nil if !@delayed_task
    timeleft = @delayed_task_at - Time.now.to_f
    return timeleft if timeleft > 0
    @delayed_task.call
  end
end

#_max_waitObject

_wait


206
207
208
# File 'lib/prefork_engine.rb', line 206

def _max_wait
  return nil
end

#_on_child_reap(pid, status) ⇒ Object

_decide_action


129
130
131
132
133
# File 'lib/prefork_engine.rb', line 129

def _on_child_reap(pid,status)
  if @options["on_child_reap"]
    @options["on_child_reap"].call(pid,status)
  end
end

#_update_spawn_delay(secs) ⇒ Object

wait_all_children


175
176
177
# File 'lib/prefork_engine.rb', line 175

def _update_spawn_delay(secs)
  @_no_adjust_until = secs ? Time.now.to_f + secs : 0.0
end

#_wait(block) ⇒ Object


179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/prefork_engine.rb', line 179

def _wait(block)
  if !block
    self._handle_delayed_task()
    return Process.wait3(Process::WNOHANG)
  else
    delayed_task_sleep = self._handle_delayed_task()
    delayed_fork_sleep = self._decide_action > 0 ? [@_no_adjust_until - Time.now.to_f,0].max : nil
    sleep_secs = [delayed_task_sleep,delayed_fork_sleep,self._max_wait].compact
    begin
      if sleep_secs.min != nil
        sleep(sleep_secs.min)
        # nonblock
        return Process.wait3(Process::WNOHANG)
      else
        #block
        return Process.wait3(0)
      end
    rescue Errno::EINTR
      # wait for timer thread?
      sleep 0.02
    rescue Errno::ECHILD
      # nothing
    end
    return nil
  end
end

#num_workersObject

signal_all_children


120
121
122
# File 'lib/prefork_engine.rb', line 120

def num_workers
  return @worker_pids.keys.length
end

#signal_all_children(sig) ⇒ Object

start


114
115
116
117
118
# File 'lib/prefork_engine.rb', line 114

def signal_all_children(sig)
  @worker_pids.keys.sort.each do |pid|
    Process.kill(sig,pid)
  end
end

#start(&block) ⇒ Object


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/prefork_engine.rb', line 39

def start(&block)
  @manager_pid = $$
  @signal_received = ""
  @generation += 1
  raise "cannot start another process while you are in child process" if @in_child

  # main loop
  while @signal_received.length == 0
    action = @_no_adjust_until <= Time.now.to_f ? self._decide_action() : 0

    if action > 0
      # start a new worker
      if @options["before_fork"]
         @options["before_fork"].call(self)
      end
      pid = nil
      begin
        pid = fork
      rescue => e
        # fork failed
        warn "fork failed:#{e}"
        self._update_spawn_delay(@options["err_respawn_interval"])
        next
      end
        if pid == nil
          @in_child = true
          @options["trap_signals"].each do |k,kv|
            ## Signal.trap(k, 0) #XXX in rspec only?
            Signal.trap(k, "DEFAULT")
          end
          ## Signal.trap("CHLD", 0) #XXX in rspec only?
          Signal.trap("CHLD", "DEFAULT")
          block.call
          exit!(true)
        end
        # parent
        if @options["after_fork"]
           @options["after_fork"].call(self)
        end
        @worker_pids[pid] = @generation
        self._update_spawn_delay(@options["spawn_interval"])
    end
    if r = self._wait(action <= 0)
      self._on_child_reap(r.pid, r.status)
      if @worker_pids.delete(r.pid) == @generation && r.status != 0
        self._update_spawn_delay(@options["err_respawn_interval"])
      end
    end
  end

  # send signals to workers
  if action = self._action_for(@signal_received)
    sig = action[0]
    interval = action[1]
    if interval > 0
      pids = @worker_pids.keys.sort
      @delayed_task = proc {
        pid = pids.shift
        Process.kill(sig, pid)
        if pids.empty?
          @delayed_task = nil
          @delayed_task_at = nil
        else
          @delayed_task_at = Time.now.to_f + interval
        end
      }
      @delayed_task_at = 0.0
      @delayed_task.call
    else
      self.signal_all_children(sig)
    end
  end
  return true
end

#wait_all_children(timeout = 0) ⇒ Object


151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/prefork_engine.rb', line 151

def wait_all_children(timeout = 0)
  wait_loop = proc {
    while !@worker_pids.keys.empty?
      if r = self._wait(true)
        if @worker_pids.delete(r.pid)
          self._on_child_reap(r.pid, r.status)
        end
      end
    end
  }
  if timeout > 0
    begin
     Timeout.timeout(timeout){
       wait_loop.call
     }
   rescue Timeout::Error
     # ignore
    end
  else
    wait_loop.call()
  end
  return self.num_workers();
end