Class: ConcurrentWorker::ConcurrentProcess::IPCDuplexChannel
- Inherits:
-
Object
- Object
- ConcurrentWorker::ConcurrentProcess::IPCDuplexChannel
- Defined in:
- lib/concurrent_worker.rb
Instance Method Summary collapse
- #choose_io ⇒ Object
- #close ⇒ Object
-
#initialize ⇒ IPCDuplexChannel
constructor
A new instance of IPCDuplexChannel.
- #recv ⇒ Object
- #send(obj) ⇒ Object
Constructor Details
#initialize ⇒ IPCDuplexChannel
Returns a new instance of IPCDuplexChannel.
261 262 263 264 265 |
# File 'lib/concurrent_worker.rb', line 261 def initialize @p_pid = Process.pid @p2c = IO.pipe('ASCII-8BIT', 'ASCII-8BIT') @c2p = IO.pipe('ASCII-8BIT', 'ASCII-8BIT') end |
Instance Method Details
#choose_io ⇒ Object
267 268 269 270 271 |
# File 'lib/concurrent_worker.rb', line 267 def choose_io w_pipe, r_pipe = @p_pid == Process.pid ? [@p2c, @c2p] : [@c2p, @p2c] @wio, @rio = w_pipe[1], r_pipe[0] [w_pipe[0], r_pipe[1]].map(&:close) end |
#close ⇒ Object
297 298 299 |
# File 'lib/concurrent_worker.rb', line 297 def close [@wio, @rio].map(&:close) end |
#recv ⇒ Object
284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/concurrent_worker.rb', line 284 def recv begin Thread.handle_interrupt(Object => :on_blocking) do szdata = @rio.read(4) return [] if szdata.nil? size = szdata.unpack("I")[0] Marshal.load(@rio.read(size)) end rescue IOError raise StopIteration end end |
#send(obj) ⇒ Object
273 274 275 276 277 278 279 280 281 282 |
# File 'lib/concurrent_worker.rb', line 273 def send(obj) begin Thread.handle_interrupt(Object => :never) do data = Marshal.dump(obj) @wio.write([data.size].pack("I")) @wio.write(data) end rescue Errno::EPIPE end end |