Class: ConcurrentWorker::IPCDuplexChannel

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent_worker/common.rb

Instance Method Summary collapse

Constructor Details

#initializeIPCDuplexChannel

Returns a new instance of IPCDuplexChannel.



50
51
52
53
54
# File 'lib/concurrent_worker/common.rb', line 50

def initialize
  @p_pid = Process.pid
  @p2c = IO.pipe('ASCII-8BIT', 'ASCII-8BIT')
  @c2p = IO.pipe('ASCII-8BIT', 'ASCII-8BIT')
end

Instance Method Details

#choose_ioObject



56
57
58
59
60
# File 'lib/concurrent_worker/common.rb', line 56

def choose_io
  w_pipe, r_pipe = @p_pid == Process.pid ? [@p2c, @c2p] : [@c2p, @p2c]
  @wio, @rio = w_pipe[1], r_pipe[0]
  [w_pipe[0], r_pipe[1]].map(&:close)
end

#closeObject



86
87
88
# File 'lib/concurrent_worker/common.rb', line 86

def close
  [@wio, @rio].map(&:close)
end

#recvObject



73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/concurrent_worker/common.rb', line 73

def recv
  begin
    Thread.handle_interrupt(Object => :on_blocking) do
      szdata = @rio.read(4)
      return [] if szdata.nil?
      size = szdata.unpack("I")[0]
      Marshal.load(@rio.read(size))
    end
  rescue IOError
    raise StopIteration
  end
end

#send(obj) ⇒ Object



62
63
64
65
66
67
68
69
70
71
# File 'lib/concurrent_worker/common.rb', line 62

def send(obj)
  begin
    Thread.handle_interrupt(Object => :never) do
      data = Marshal.dump(obj)
      @wio.write([data.size].pack("I"))
      @wio.write(data)
    end
  rescue Errno::EPIPE
  end
end