Class: BioDSL::Stream

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/BioDSL/stream.rb

Overview

Class for Inter Process Communication between forked processes using msgpack to serialize and deserialize objects.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io) ⇒ Stream

Returns a new instance of Stream.



47
48
49
# File 'lib/BioDSL/stream.rb', line 47

def initialize(io)
  @io = io
end

Class Method Details

.pipeObject

Create a pair of connected pipe endpoints. The connection uses msgpack allowing objects to be written and read.

Stream.pipe -> [read_io, write_io]



41
42
43
44
45
# File 'lib/BioDSL/stream.rb', line 41

def self.pipe
  read, write = IO.pipe(Encoding::BINARY)

  [new(read), new(write)]
end

Instance Method Details

#closeObject



51
52
53
# File 'lib/BioDSL/stream.rb', line 51

def close
  @io.close
end

#closed?Boolean

Returns:

  • (Boolean)


55
56
57
# File 'lib/BioDSL/stream.rb', line 55

def closed?
  @io.closed?
end

#eachObject



59
60
61
# File 'lib/BioDSL/stream.rb', line 59

def each
  yield read until @io.eof?
end

#readObject



63
64
65
66
67
68
69
# File 'lib/BioDSL/stream.rb', line 63

def read
  size = @io.read(4)
  fail EOFError unless size
  size = size.unpack('I').first
  msg  = @io.read(size)
  MessagePack.unpack(msg, symbolize_keys: true)
end

#write(obj) ⇒ Object Also known as: <<



71
72
73
74
75
# File 'lib/BioDSL/stream.rb', line 71

def write(obj)
  msg = MessagePack.pack(obj)
  @io.write([msg.size].pack('I'))
  @io.write(msg)
end