Class: PreforkEngine

Inherits:
Object
  • Object
show all
Defined in:
lib/prefork_engine.rb,
lib/prefork_engine/version.rb

Constant Summary collapse

VERSION =
"0.0.4"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ PreforkEngine

Returns a new instance of PreforkEngine.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/prefork_engine.rb', line 8

def initialize(options={})
  defaults = {
    "max_workers"          => 10,
    "spawn_interval"       => 0,
    "err_respawn_interval" => 1,
    "trap_signals"         => {
      "TERM" => "TERM"
    },
    "before_fork"          => nil,
    "after_fork"           => nil,
    "on_child_reap"        => nil,
  }
  @options = defaults.merge(options)
  @signal_received = ""
  @manager_pid = ""
  @generation = 0
  @_no_adjust_until = 0.0
  @in_child = false
  @worker_pids = {}
  @delayed_task = nil
  @options["trap_signals"].each do |k,kv|
    Signal.trap(k) { |signo|
      @signal_received = Signal.signame(signo)
    }
  end
  Signal.trap("CHLD") {
    #do nothing
  }
end

Instance Attribute Details

#manager_pidObject (readonly)

Returns the value of attribute manager_pid.



6
7
8
# File 'lib/prefork_engine.rb', line 6

def manager_pid
  @manager_pid
end

#signal_receivedObject (readonly)

Returns the value of attribute signal_received.



5
6
7
# File 'lib/prefork_engine.rb', line 5

def signal_received
  @signal_received
end

Instance Method Details

#_action_for(sig) ⇒ Object

_handle_delayed_task



142
143
144
145
146
147
# File 'lib/prefork_engine.rb', line 142

def _action_for(sig)
  return nil if !@options["trap_signals"].has_key?(sig)
  t = @options["trap_signals"][sig]
  t = [t,0] if !t.kind_of?(Enumerable)
  return t
end

#_decide_actionObject



122
123
124
125
# File 'lib/prefork_engine.rb', line 122

def _decide_action
  return 1 if self.num_workers < @options["max_workers"]
  return 0
end

#_handle_delayed_taskObject



133
134
135
136
137
138
139
140
# File 'lib/prefork_engine.rb', line 133

def _handle_delayed_task
  while true
    return nil if !@delayed_task
    timeleft = @delayed_task_at - Time.now.to_f;
    return timeleft if timeleft > 0
    @delayed_task.call
  end
end

#_max_waitObject

_wait



188
189
190
# File 'lib/prefork_engine.rb', line 188

def _max_wait
  return nil
end

#_on_child_reap(pid, status) ⇒ Object

_decide_action



127
128
129
130
131
# File 'lib/prefork_engine.rb', line 127

def _on_child_reap(pid,status)
  if @options["on_child_reap"] then
    @options["on_child_reap"].call(pid,status)
  end
end

#_update_spawn_delay(secs) ⇒ Object

wait_all_children



161
162
163
# File 'lib/prefork_engine.rb', line 161

def _update_spawn_delay(secs)
  @_no_adjust_until = secs ? Time.now.to_f + secs : 0.0
end

#_waitObject



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/prefork_engine.rb', line 165

def _wait()
  #XXX always blocking
  delayed_task_sleep = self._handle_delayed_task()
  delayed_fork_sleep = self._decide_action > 0 ? [@_no_adjust_until - Time.now.to_f,0].max : nil
  sleep_secs = [delayed_task_sleep,delayed_fork_sleep,self._max_wait].select {|v| v != nil}
  begin
    if sleep_secs.min != nil then
      sleep(sleep_secs.min)
      # nonblock
      return Process.wait3(1)
    else
      #block
      return Process.wait3(0)
    end
  rescue Errno::EINTR
    # wait for timer thread?
    sleep 0.02
  rescue Errno::ECHILD
    # nothing
  end
  return nil
end

#num_workersObject

signal_all_children



118
119
120
# File 'lib/prefork_engine.rb', line 118

def num_workers
  return @worker_pids.keys.length
end

#signal_all_children(sig) ⇒ Object

start



112
113
114
115
116
# File 'lib/prefork_engine.rb', line 112

def signal_all_children(sig)
  @worker_pids.keys.sort.each do |pid|
    Process.kill(sig,pid)
  end
end

#start(&block) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/prefork_engine.rb', line 38

def start(&block)
  @manager_pid = $$
  @signal_received = ""
  @generation += 1
  raise "cannot start another process while you are in child process" if @in_child

  # main loop
  while @signal_received.length == 0
    action = self._decide_action() if @_no_adjust_until <= Time.now.to_f
    if action > 0 then
      # start a new worker
      if @options["before_fork"] then
         @options["before_fork"].call(self)
      end
      pid = nil
      begin
        pid = fork
      rescue => e
        # fork failed
        warn "fork failed:#{e}";
        self._update_spawn_delay(@options["err_respawn_interval"])
        next
      end
        if pid == nil then
          @in_child = true
          @options["trap_signals"].each do |k,kv|
            ## Signal.trap(k, 0) #XXX in rspec only?
            Signal.trap(k, "DEFAULT")
          end
          ## Signal.trap("CHLD", 0) #XXX in rspec only?
          Signal.trap("CHLD", "DEFAULT")
          block.call
          exit!(true)
        end
        # parent
        if @options["after_fork"] then
           @options["after_fork"].call(self)
        end
        @worker_pids[pid] = @generation
        self._update_spawn_delay(@options["spawn_interval"])
    end
    if r = self._wait() then
      self._on_child_reap(r.pid, r.status)
      if @worker_pids.delete(r.pid) == @generation && r.status != 0 then
        self._update_spawn_delay(@options["err_respawn_interval"])
      end
    end
  end

  # send signals to workers
  if action = self._action_for(@signal_received) then
    sig = action[0]
    interval = action[1]
    if interval > 0 then
      pids = @worker_pids.keys.sort
      @delayed_task = proc {
        pid = pids.shift
        Process.kill(sig, pid)
        if pids.empty? then
          @delayed_task = nil
          @delayed_task_at = nil
        else
          @delayed_task_at = Time.now.to_f + interval
        end
      }
      @delayed_task_at = 0.0
      @delayed_task.call
    else
      self.signal_all_children(sig)
    end
  end
  return true
end

#wait_all_childrenObject



149
150
151
152
153
154
155
156
157
158
159
# File 'lib/prefork_engine.rb', line 149

def wait_all_children
  #XXX todo timeout
  while !@worker_pids.keys.empty?
    if r = self._wait() then
      if @worker_pids.delete(r.pid) then
        self._on_child_reap(r.pid, r.status)
      end
    end
  end
  return 0
end