Module: ObjectStream

Includes:
Enumerable
Included in:
JsonStream, MarshalStream, MsgpackStream, YamlStream
Defined in:
lib/object-stream.rb

Overview

Stream of objects, with any underlying IO: File, Pipe, Socket, StringIO. Stream is bidirectional if the IO is bidirectional.

Serializes objects using any of several serializers: marshal, yaml, json, msgpack. Works with select/readpartial if the serializer supports it (msgpack and yajl do).

ObjectStream supports three styles of iteration: Enumerable, blocking read, and yielding (non-blocking) read.

Defined Under Namespace

Classes: JsonStream, MarshalStream, MsgpackStream, OverflowError, YamlStream

Constant Summary collapse

VERSION =
"0.3"
MARSHAL_TYPE =
"marshal".freeze
YAML_TYPE =
"yaml".freeze
JSON_TYPE =
"json".freeze
MSGPACK_TYPE =
"msgpack".freeze
TYPES =
[
  MARSHAL_TYPE, YAML_TYPE, JSON_TYPE, MSGPACK_TYPE
]
DEFAULT_MAX_OUTBOX =
10

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#ioObject (readonly)

The IO through which the stream reads and writes serialized object data.



16
17
18
# File 'lib/object-stream.rb', line 16

def io
  @io
end

#max_outboxObject (readonly)

Number of outgoing objects that can accumulate before the outbox is serialized to the byte buffer (and possibly to the io).



20
21
22
# File 'lib/object-stream.rb', line 20

def max_outbox
  @max_outbox
end

Class Method Details

.new(io, type: MARSHAL_TYPE, **opts) ⇒ Object



41
42
43
44
45
46
47
# File 'lib/object-stream.rb', line 41

def new io, type: MARSHAL_TYPE, **opts
  if io.kind_of? ObjectStream
    raise ArgumentError,
      "given io is already an ObjectStream: #{io.inspect}"
  end
  stream_class_for(type).new io, **opts
end

.register_type(type, &bl) ⇒ Object



61
62
63
# File 'lib/object-stream.rb', line 61

def register_type type, &bl
  @stream_class_map[type] = bl
end

.stream_class_for(type) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/object-stream.rb', line 49

def stream_class_for type
  cl = @stream_class_map[type]
  return cl if cl.respond_to? :new

  # Protect against race condition in msgpack and yajl extension
  # initialization (bug #8374).
  @mutex.synchronize do
    return cl if cl.respond_to? :new
    @stream_class_map[type] = cl.call
  end
end

Instance Method Details

#closeObject

Call this if the most recent write was a #write_to_buffer without a #flush_buffer. If you only use #write, there’s no need to close the stream in any special way.



175
176
177
178
# File 'lib/object-stream.rb', line 175

def close
  flush_outbox
  io.close
end

#closed?Boolean

Returns:

  • (Boolean)


180
181
182
# File 'lib/object-stream.rb', line 180

def closed?
  io.closed?
end

#eachObject

Iterate through the (rest of) the stream of objects. Does not raise EOFError, but simply returns. All Enumerable and Enumerator methods are available.



161
162
163
164
165
# File 'lib/object-stream.rb', line 161

def each
  return to_enum unless block_given?
  read {|obj| yield obj} until eof
rescue EOFError
end

#eof?Boolean Also known as: eof

Returns:

  • (Boolean)


167
168
169
# File 'lib/object-stream.rb', line 167

def eof?
  (!@inbox || @inbox.empty?) && io.eof?
end

#flush_bufferObject



154
155
156
# File 'lib/object-stream.rb', line 154

def flush_buffer
  self
end

#flush_outboxObject



137
138
139
140
141
142
143
144
# File 'lib/object-stream.rb', line 137

def flush_outbox
  @outbox.each do |object|
    object = object.call if object.kind_of? Proc
    write_to_stream object
  end
  @outbox.clear
  self
end

#initialize(io, max_outbox: DEFAULT_MAX_OUTBOX, **opts) ⇒ Object



66
67
68
69
70
71
# File 'lib/object-stream.rb', line 66

def initialize io, max_outbox: DEFAULT_MAX_OUTBOX, **opts
  @io = io
  @max_outbox = max_outbox
  @inbox = nil
  @outbox = []
end

#readObject

If no block given, behaves just the same as #read_one. If block given, reads any available data and yields it to the block. This form is non- blocking, if supported by the underlying serializer (such as msgpack).



80
81
82
83
84
85
86
87
88
# File 'lib/object-stream.rb', line 80

def read
  if block_given?
    read_from_inbox {|obj| yield obj}
    read_from_stream {|obj| yield obj}
    return nil
  else
    read_one
  end
end

#read_oneObject

Read one object from the stream, blocking if necessary. Returns the object. Raises EOFError at the end of the stream.



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/object-stream.rb', line 92

def read_one
  if @inbox and not @inbox.empty?
    return @inbox.shift
  end
  
  have_result = false
  result = nil
  until have_result
    read do |obj| # might not read enough bytes to yield an obj
      if have_result
        (@inbox||=[]) << obj
      else
        have_result = true
        result = obj
      end
    end
  end
  result
end

#to_ioObject

Makes it possible to use stream in a select.



185
186
187
# File 'lib/object-stream.rb', line 185

def to_io
  io
end

#to_sObject



73
74
75
# File 'lib/object-stream.rb', line 73

def to_s
  "#<#{self.class} io=#{io.inspect}>"
end

#write(*objects) ⇒ Object Also known as: <<

Write the given objects to the stream, first flushing any objects in the outbox. Flushes the underlying byte buffer afterwards.



122
123
124
125
# File 'lib/object-stream.rb', line 122

def write *objects
  write_to_buffer *objects
  flush_buffer
end

#write_to_buffer(*objects) ⇒ Object



146
147
148
149
150
151
152
# File 'lib/object-stream.rb', line 146

def write_to_buffer *objects
  flush_outbox
  objects.each do |object|
    write_to_stream object
  end
  self
end

#write_to_outbox(object = nil, &bl) ⇒ Object

Push the given object into the outbox, to be written later when the outbox is flushed. If a block is given, it will be called when the outbox is flushed, and its value will be written instead.



131
132
133
134
135
# File 'lib/object-stream.rb', line 131

def write_to_outbox object=nil, &bl
  @outbox << (bl || object)
  flush_outbox if @outbox.size > max_outbox
  self
end