Module: ObjectStream
- Includes:
- Enumerable
- Included in:
- JsonStream, MarshalStream, MsgpackStream, YamlStream
- Defined in:
- lib/object-stream.rb
Overview
Stream of objects, with any underlying IO: File, Pipe, Socket, StringIO. Stream is bidirectional if the IO is bidirectional.
Serializes objects using any of several serializers: marshal, yaml, json, msgpack. Works with select/readpartial if the serializer supports it (msgpack and yajl do).
ObjectStream supports three styles of iteration: Enumerable, blocking read, and yielding (non-blocking) read.
Defined Under Namespace
Classes: JsonStream, MarshalStream, MsgpackStream, OverflowError, YamlStream
Constant Summary collapse
- VERSION =
"0.3"
- MARSHAL_TYPE =
"marshal".freeze
- YAML_TYPE =
"yaml".freeze
- JSON_TYPE =
"json".freeze
- MSGPACK_TYPE =
"msgpack".freeze
- TYPES =
[ MARSHAL_TYPE, YAML_TYPE, JSON_TYPE, MSGPACK_TYPE ]
- DEFAULT_MAX_OUTBOX =
10
Instance Attribute Summary collapse
-
#io ⇒ Object
readonly
The IO through which the stream reads and writes serialized object data.
-
#max_outbox ⇒ Object
readonly
Number of outgoing objects that can accumulate before the outbox is serialized to the byte buffer (and possibly to the io).
Class Method Summary collapse
- .new(io, type: MARSHAL_TYPE, **opts) ⇒ Object
- .register_type(type, &bl) ⇒ Object
- .stream_class_for(type) ⇒ Object
Instance Method Summary collapse
-
#close ⇒ Object
Call this if the most recent write was a #write_to_buffer without a #flush_buffer.
- #closed? ⇒ Boolean
-
#each ⇒ Object
Iterate through the (rest of) the stream of objects.
- #eof? ⇒ Boolean (also: #eof)
- #flush_buffer ⇒ Object
- #flush_outbox ⇒ Object
- #initialize(io, max_outbox: DEFAULT_MAX_OUTBOX, **opts) ⇒ Object
-
#read ⇒ Object
If no block given, behaves just the same as #read_one.
-
#read_one ⇒ Object
Read one object from the stream, blocking if necessary.
-
#to_io ⇒ Object
Makes it possible to use stream in a select.
- #to_s ⇒ Object
-
#write(*objects) ⇒ Object
(also: #<<)
Write the given objects to the stream, first flushing any objects in the outbox.
- #write_to_buffer(*objects) ⇒ Object
-
#write_to_outbox(object = nil, &bl) ⇒ Object
Push the given object into the outbox, to be written later when the outbox is flushed.
Instance Attribute Details
#io ⇒ Object (readonly)
The IO through which the stream reads and writes serialized object data.
16 17 18 |
# File 'lib/object-stream.rb', line 16 def io @io end |
#max_outbox ⇒ Object (readonly)
Number of outgoing objects that can accumulate before the outbox is serialized to the byte buffer (and possibly to the io).
20 21 22 |
# File 'lib/object-stream.rb', line 20 def max_outbox @max_outbox end |
Class Method Details
.new(io, type: MARSHAL_TYPE, **opts) ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/object-stream.rb', line 41 def new io, type: MARSHAL_TYPE, **opts if io.kind_of? ObjectStream raise ArgumentError, "given io is already an ObjectStream: #{io.inspect}" end stream_class_for(type).new io, **opts end |
.register_type(type, &bl) ⇒ Object
61 62 63 |
# File 'lib/object-stream.rb', line 61 def register_type type, &bl @stream_class_map[type] = bl end |
.stream_class_for(type) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/object-stream.rb', line 49 def stream_class_for type cl = @stream_class_map[type] return cl if cl.respond_to? :new # Protect against race condition in msgpack and yajl extension # initialization (bug #8374). @mutex.synchronize do return cl if cl.respond_to? :new @stream_class_map[type] = cl.call end end |
Instance Method Details
#close ⇒ Object
Call this if the most recent write was a #write_to_buffer without a #flush_buffer. If you only use #write, there’s no need to close the stream in any special way.
175 176 177 178 |
# File 'lib/object-stream.rb', line 175 def close flush_outbox io.close end |
#closed? ⇒ Boolean
180 181 182 |
# File 'lib/object-stream.rb', line 180 def closed? io.closed? end |
#each ⇒ Object
Iterate through the (rest of) the stream of objects. Does not raise EOFError, but simply returns. All Enumerable and Enumerator methods are available.
161 162 163 164 165 |
# File 'lib/object-stream.rb', line 161 def each return to_enum unless block_given? read {|obj| yield obj} until eof rescue EOFError end |
#eof? ⇒ Boolean Also known as: eof
167 168 169 |
# File 'lib/object-stream.rb', line 167 def eof? (!@inbox || @inbox.empty?) && io.eof? end |
#flush_buffer ⇒ Object
154 155 156 |
# File 'lib/object-stream.rb', line 154 def flush_buffer self end |
#flush_outbox ⇒ Object
137 138 139 140 141 142 143 144 |
# File 'lib/object-stream.rb', line 137 def flush_outbox @outbox.each do |object| object = object.call if object.kind_of? Proc write_to_stream object end @outbox.clear self end |
#initialize(io, max_outbox: DEFAULT_MAX_OUTBOX, **opts) ⇒ Object
66 67 68 69 70 71 |
# File 'lib/object-stream.rb', line 66 def initialize io, max_outbox: DEFAULT_MAX_OUTBOX, **opts @io = io @max_outbox = max_outbox @inbox = nil @outbox = [] end |
#read ⇒ Object
If no block given, behaves just the same as #read_one. If block given, reads any available data and yields it to the block. This form is non- blocking, if supported by the underlying serializer (such as msgpack).
80 81 82 83 84 85 86 87 88 |
# File 'lib/object-stream.rb', line 80 def read if block_given? read_from_inbox {|obj| yield obj} read_from_stream {|obj| yield obj} return nil else read_one end end |
#read_one ⇒ Object
Read one object from the stream, blocking if necessary. Returns the object. Raises EOFError at the end of the stream.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/object-stream.rb', line 92 def read_one if @inbox and not @inbox.empty? return @inbox.shift end have_result = false result = nil until have_result read do |obj| # might not read enough bytes to yield an obj if have_result (@inbox||=[]) << obj else have_result = true result = obj end end end result end |
#to_io ⇒ Object
Makes it possible to use stream in a select.
185 186 187 |
# File 'lib/object-stream.rb', line 185 def to_io io end |
#to_s ⇒ Object
73 74 75 |
# File 'lib/object-stream.rb', line 73 def to_s "#<#{self.class} io=#{io.inspect}>" end |
#write(*objects) ⇒ Object Also known as: <<
Write the given objects to the stream, first flushing any objects in the outbox. Flushes the underlying byte buffer afterwards.
122 123 124 125 |
# File 'lib/object-stream.rb', line 122 def write *objects write_to_buffer *objects flush_buffer end |
#write_to_buffer(*objects) ⇒ Object
146 147 148 149 150 151 152 |
# File 'lib/object-stream.rb', line 146 def write_to_buffer *objects flush_outbox objects.each do |object| write_to_stream object end self end |
#write_to_outbox(object = nil, &bl) ⇒ Object
Push the given object into the outbox, to be written later when the outbox is flushed. If a block is given, it will be called when the outbox is flushed, and its value will be written instead.
131 132 133 134 135 |
# File 'lib/object-stream.rb', line 131 def write_to_outbox object=nil, &bl @outbox << (bl || object) flush_outbox if @outbox.size > max_outbox self end |