Class: ObjectStream::MsgpackStream

Inherits:
Object
  • Object
show all
Includes:
ObjectStream
Defined in:
lib/object-stream.rb

Constant Summary collapse

DEFAULT_CHUNK_SIZE =
2000
DEFAULT_MAXBUF =
4000

Constants included from ObjectStream

DEFAULT_MAX_OUTBOX, JSON_TYPE, MARSHAL_TYPE, MSGPACK_TYPE, TYPES, VERSION, YAML_TYPE

Instance Attribute Summary collapse

Attributes included from ObjectStream

#io, #max_outbox

Instance Method Summary collapse

Methods included from ObjectStream

#close, #closed?, #each, #eof?, #flush_outbox, new, #read, #read_one, register_type, stream_class_for, #to_io, #to_s, #write, #write_to_outbox

Constructor Details

#initialize(io, chunk_size: DEFAULT_CHUNK_SIZE, maxbuf: DEFAULT_MAXBUF) ⇒ MsgpackStream

Returns a new instance of MsgpackStream.



272
273
274
275
276
277
278
279
280
# File 'lib/object-stream.rb', line 272

def initialize io, chunk_size: DEFAULT_CHUNK_SIZE, maxbuf: DEFAULT_MAXBUF
  super
  @unpacker = MessagePack::Unpacker.new
    # don't specify io, so don't have to read all of io in one loop
  
  @packer = MessagePack::Packer.new(io)
  @chunk_size = chunk_size
  @maxbuf = maxbuf
end

Instance Attribute Details

#chunk_sizeObject

Returns the value of attribute chunk_size.



266
267
268
# File 'lib/object-stream.rb', line 266

def chunk_size
  @chunk_size
end

#maxbufObject

Returns the value of attribute maxbuf.



267
268
269
# File 'lib/object-stream.rb', line 267

def maxbuf
  @maxbuf
end

Instance Method Details

#checkbufObject



301
302
303
304
305
306
# File 'lib/object-stream.rb', line 301

def checkbuf
  if maxbuf and @unpacker.buffer.size > maxbuf
    raise OverflowError,
      "Exceeded buffer limit by #{@unpacker.buffer.size - maxbuf} bytes."
  end
end

#fill_buffer(n) ⇒ Object



291
292
293
# File 'lib/object-stream.rb', line 291

def fill_buffer n
  @unpacker.feed(io.readpartial(n))
end

#flush_bufferObject



321
322
323
324
# File 'lib/object-stream.rb', line 321

def flush_buffer
  @packer.flush
  self
end

#read_from_bufferObject



295
296
297
298
299
# File 'lib/object-stream.rb', line 295

def read_from_buffer
  @unpacker.each do |obj|
    yield obj
  end
end

#read_from_streamObject

Blocks only if no data available on io.



283
284
285
286
287
288
289
# File 'lib/object-stream.rb', line 283

def read_from_stream
  fill_buffer(chunk_size)
  checkbuf if maxbuf
  read_from_buffer do |obj|
    yield obj
  end
end

#write_to_buffer(*objects) ⇒ Object



313
314
315
316
317
318
319
# File 'lib/object-stream.rb', line 313

def write_to_buffer *objects
  flush_outbox
  objects.each do |object|
    @packer.write(object)
  end
  self
end

#write_to_stream(object) ⇒ Object



308
309
310
311
# File 'lib/object-stream.rb', line 308

def write_to_stream object
  @packer.write(object).flush
  self
end