Class: Levdon::Worker
- Inherits:
-
Object
- Object
- Levdon::Worker
- Defined in:
- lib/levdon.rb
Overview
Instance Attribute Summary collapse
-
#pid ⇒ Object
readonly
Returns the value of attribute pid.
Instance Method Summary collapse
- #alive? ⇒ Boolean
- #async_execute(*msg) ⇒ Object
- #create_pipe ⇒ Object
- #execute(*msg) ⇒ Object
-
#initialize(&block) ⇒ Worker
constructor
A new instance of Worker.
- #install_exit_handler ⇒ Object
- #install_signal_handler ⇒ Object
- #nonblock_read_from_child ⇒ Object
- #nonblock_write_to_child(obj) ⇒ Object
- #poll ⇒ Object
- #read_from_child ⇒ Object
- #read_from_parent ⇒ Object
- #read_object(read) ⇒ Object
- #run ⇒ Object
- #stop ⇒ Object
- #wait_after_fork ⇒ Object
- #write_object(obj, write) ⇒ Object
- #write_to_child(obj) ⇒ Object
- #write_to_parent(obj) ⇒ Object
Constructor Details
#initialize(&block) ⇒ Worker
Returns a new instance of Worker.
134 135 136 137 138 139 |
# File 'lib/levdon.rb', line 134 def initialize(&block) @child_read, @parent_write = create_pipe @parent_read, @child_write = create_pipe @block = block @io_stream = NonBlockLineStream.new(@parent_read,@parent_write) end |
Instance Attribute Details
#pid ⇒ Object (readonly)
Returns the value of attribute pid.
132 133 134 |
# File 'lib/levdon.rb', line 132 def pid @pid end |
Instance Method Details
#alive? ⇒ Boolean
179 180 181 182 183 184 |
# File 'lib/levdon.rb', line 179 def alive? Process.kill(0, @pid) true rescue Errno::ESRCH false end |
#async_execute(*msg) ⇒ Object
169 170 171 |
# File 'lib/levdon.rb', line 169 def async_execute(*msg) nonblock_write_to_child(msg) end |
#create_pipe ⇒ Object
141 142 143 |
# File 'lib/levdon.rb', line 141 def create_pipe IO.pipe.map{|pipe| pipe.tap{|_| _.set_encoding("ASCII-8BIT", "ASCII-8BIT") } } end |
#execute(*msg) ⇒ Object
164 165 166 167 |
# File 'lib/levdon.rb', line 164 def execute(*msg) write_to_child(msg) Thread.new { read_from_child } end |
#install_exit_handler ⇒ Object
243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/levdon.rb', line 243 def install_exit_handler at_exit do next unless alive? begin Process.kill("KILL", @pid) Process.wait(@pid) rescue Errno::ESRCH # noop rescue => e puts "error at_exit: #{ e }" raise e end end end |
#install_signal_handler ⇒ Object
258 259 260 261 262 263 264 265 266 |
# File 'lib/levdon.rb', line 258 def install_signal_handler [:INT, :QUIT].each do |signal| old_handler = Signal.trap(signal) { Process.kill(signal, @pid) Process.wait(@pid) old_handler.call } end end |
#nonblock_read_from_child ⇒ Object
200 201 202 203 204 205 206 |
# File 'lib/levdon.rb', line 200 def nonblock_read_from_child() data = @io_stream.read if(data) return Marshal.load(data.chomp.gsub("@NDELIMITER@", "\n")) end return nil end |
#nonblock_write_to_child(obj) ⇒ Object
208 209 210 211 |
# File 'lib/levdon.rb', line 208 def nonblock_write_to_child(obj) data = Marshal.dump(obj).gsub("\n", "@NDELIMITER@") + "\n" @io_stream.write(data) end |
#poll ⇒ Object
186 187 188 |
# File 'lib/levdon.rb', line 186 def poll @io_stream.poll end |
#read_from_child ⇒ Object
213 214 215 |
# File 'lib/levdon.rb', line 213 def read_from_child read_object(@parent_read) end |
#read_from_parent ⇒ Object
221 222 223 |
# File 'lib/levdon.rb', line 221 def read_from_parent read_object(@child_read) end |
#read_object(read) ⇒ Object
195 196 197 198 |
# File 'lib/levdon.rb', line 195 def read_object(read) data = read.gets Marshal.load(data.chomp.gsub("@NDELIMITER@", "\n")) end |
#run ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/levdon.rb', line 145 def run @pid = fork do @parent_read.close @parent_write.close write_to_parent(:ready) loop do args = read_from_parent break if args == :stop result = @block.call(*args) write_object(result, @child_write) end @child_read.close @child_write.close end wait_after_fork if @pid end |
#stop ⇒ Object
173 174 175 176 177 |
# File 'lib/levdon.rb', line 173 def stop return unless alive? write_to_child(:stop) Process.wait(@pid) end |
#wait_after_fork ⇒ Object
229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/levdon.rb', line 229 def wait_after_fork @child_read.close @child_write.close install_exit_handler install_signal_handler Thread.new { result = read_from_child raise "Failed to start worker pid #{ @pid }" unless result == :ready result } end |
#write_object(obj, write) ⇒ Object
190 191 192 193 |
# File 'lib/levdon.rb', line 190 def write_object(obj, write) data = Marshal.dump(obj).gsub("\n", "@NDELIMITER@") + "\n" write.write data end |
#write_to_child(obj) ⇒ Object
217 218 219 |
# File 'lib/levdon.rb', line 217 def write_to_child(obj) write_object(obj, @parent_write) end |
#write_to_parent(obj) ⇒ Object
225 226 227 |
# File 'lib/levdon.rb', line 225 def write_to_parent(obj) write_object(obj, @child_write) end |