Class: ConcurrentWorker::IPCDuplexChannel
- Inherits:
-
Object
- Object
- ConcurrentWorker::IPCDuplexChannel
- Defined in:
- lib/concurrent_worker/common.rb
Instance Method Summary collapse
- #choose_io ⇒ Object
- #close ⇒ Object
-
#initialize ⇒ IPCDuplexChannel
constructor
A new instance of IPCDuplexChannel.
- #recv ⇒ Object
- #send(obj) ⇒ Object
Constructor Details
#initialize ⇒ IPCDuplexChannel
Returns a new instance of IPCDuplexChannel.
50 51 52 53 54 |
# File 'lib/concurrent_worker/common.rb', line 50 def initialize @p_pid = Process.pid @p2c = IO.pipe('ASCII-8BIT', 'ASCII-8BIT') @c2p = IO.pipe('ASCII-8BIT', 'ASCII-8BIT') end |
Instance Method Details
#choose_io ⇒ Object
56 57 58 59 60 |
# File 'lib/concurrent_worker/common.rb', line 56 def choose_io w_pipe, r_pipe = @p_pid == Process.pid ? [@p2c, @c2p] : [@c2p, @p2c] @wio, @rio = w_pipe[1], r_pipe[0] [w_pipe[0], r_pipe[1]].map(&:close) end |
#close ⇒ Object
86 87 88 |
# File 'lib/concurrent_worker/common.rb', line 86 def close [@wio, @rio].map(&:close) end |
#recv ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/concurrent_worker/common.rb', line 73 def recv begin Thread.handle_interrupt(Object => :on_blocking) do szdata = @rio.read(4) return [] if szdata.nil? size = szdata.unpack("I")[0] Marshal.load(@rio.read(size)) end rescue IOError raise StopIteration end end |
#send(obj) ⇒ Object
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/concurrent_worker/common.rb', line 62 def send(obj) begin Thread.handle_interrupt(Object => :never) do data = Marshal.dump(obj) @wio.write([data.size].pack("I")) @wio.write(data) end rescue Errno::EPIPE end end |