Class: ConcurrentWorker::ConcurrentProcess::IPCDuplexChannel

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent_worker.rb

Instance Method Summary collapse

Constructor Details

#initializeIPCDuplexChannel

Returns a new instance of IPCDuplexChannel.



261
262
263
264
265
# File 'lib/concurrent_worker.rb', line 261

def initialize
  @p_pid = Process.pid
  @p2c = IO.pipe('ASCII-8BIT', 'ASCII-8BIT')
  @c2p = IO.pipe('ASCII-8BIT', 'ASCII-8BIT')
end

Instance Method Details

#choose_ioObject



267
268
269
270
271
# File 'lib/concurrent_worker.rb', line 267

def choose_io
  w_pipe, r_pipe = @p_pid == Process.pid ? [@p2c, @c2p] : [@c2p, @p2c]
  @wio, @rio = w_pipe[1], r_pipe[0]
  [w_pipe[0], r_pipe[1]].map(&:close)
end

#closeObject



297
298
299
# File 'lib/concurrent_worker.rb', line 297

def close
  [@wio, @rio].map(&:close)
end

#recvObject



284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/concurrent_worker.rb', line 284

def recv
  begin
    Thread.handle_interrupt(Object => :on_blocking) do
      szdata = @rio.read(4)
      return [] if szdata.nil?
      size = szdata.unpack("I")[0]
      Marshal.load(@rio.read(size))
    end
  rescue IOError
    raise StopIteration
  end
end

#send(obj) ⇒ Object



273
274
275
276
277
278
279
280
281
282
# File 'lib/concurrent_worker.rb', line 273

def send(obj)
  begin
    Thread.handle_interrupt(Object => :never) do
      data = Marshal.dump(obj)
      @wio.write([data.size].pack("I"))
      @wio.write(data)
    end
  rescue Errno::EPIPE
  end
end