Module: ObjectStream

Includes:
Enumerable
Included in:
JsonStream, MarshalStream, MsgpackStream, YamlStream
Defined in:
lib/object-stream.rb

Overview

Stream of objects, with any underlying IO: File, Pipe, Socket, StringIO. Stream is bidirectional if the IO is bidirectional.

Serializes objects using any of several serializers: marshal, yaml, json, msgpack. Works with select/readpartial if the serializer supports it (msgpack and yajl do).

ObjectStream supports three styles of iteration: Enumerable, blocking read, and yielding (non-blocking) read.

Defined Under Namespace

Classes: JsonStream, MarshalStream, MsgpackStream, OverflowError, StreamError, YamlStream

Constant Summary collapse

VERSION =
"0.8"
MARSHAL_TYPE =
"marshal".freeze
YAML_TYPE =
"yaml".freeze
JSON_TYPE =
"json".freeze
MSGPACK_TYPE =
"msgpack".freeze
TYPES =
[
  MARSHAL_TYPE, YAML_TYPE, JSON_TYPE, MSGPACK_TYPE
]
DEFAULT_MAX_OUTBOX =
10

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#ioObject (readonly)

The IO through which the stream reads and writes serialized object data.



16
17
18
# File 'lib/object-stream.rb', line 16

def io
  @io
end

#max_outboxObject (readonly)

Number of outgoing objects that can accumulate before the outbox is serialized to the byte buffer (and possibly to the io).



20
21
22
# File 'lib/object-stream.rb', line 20

def max_outbox
  @max_outbox
end

Class Method Details

.new(io, type: MARSHAL_TYPE, **opts) ⇒ Object



44
45
46
47
48
49
50
# File 'lib/object-stream.rb', line 44

def new io, type: MARSHAL_TYPE, **opts
  if io.kind_of? ObjectStream
    raise ArgumentError,
      "given io is already an ObjectStream: #{io.inspect}"
  end
  stream_class_for(type).new io, **opts
end

.register_type(type, &bl) ⇒ Object



64
65
66
# File 'lib/object-stream.rb', line 64

def register_type type, &bl
  @stream_class_map[type] = bl
end

.stream_class_for(type) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
# File 'lib/object-stream.rb', line 52

def stream_class_for type
  cl = @stream_class_map[type]
  return cl if cl.respond_to? :new

  # Protect against race condition in msgpack and yajl extension
  # initialization (bug #8374).
  @mutex.synchronize do
    return cl if cl.respond_to? :new
    @stream_class_map[type] = cl.call
  end
end

Instance Method Details

#checked_read_from_streamObject



93
94
95
96
97
98
99
# File 'lib/object-stream.rb', line 93

def checked_read_from_stream
  read_from_stream {|obj| yield obj}
rescue IOError, SystemCallError, OverflowError
  raise
rescue => ex
  raise StreamError, "unreadble stream: #{ex}"
end

#closeObject

Call this if the most recent write was a #write_to_buffer without a #flush_buffer. If you only use #write, there’s no need to close the stream in any special way.



186
187
188
189
# File 'lib/object-stream.rb', line 186

def close
  flush_outbox
  io.close
end

#closed?Boolean

Returns:

  • (Boolean)


191
192
193
# File 'lib/object-stream.rb', line 191

def closed?
  io.closed?
end

#eachObject

Iterate through the (rest of) the stream of objects. Does not raise EOFError, but simply returns. All Enumerable and Enumerator methods are available.



172
173
174
175
176
# File 'lib/object-stream.rb', line 172

def each
  return to_enum unless block_given?
  read {|obj| yield obj} until eof
rescue EOFError
end

#eof?Boolean Also known as: eof

Returns:

  • (Boolean)


178
179
180
# File 'lib/object-stream.rb', line 178

def eof?
  (!@inbox || @inbox.empty?) && io.eof?
end

#flush_bufferObject



165
166
167
# File 'lib/object-stream.rb', line 165

def flush_buffer
  self
end

#flush_outboxObject



148
149
150
151
152
153
154
155
# File 'lib/object-stream.rb', line 148

def flush_outbox
  @outbox.each do |object|
    object = object.call if object.kind_of? Proc
    write_to_stream object
  end
  @outbox.clear
  self
end

#initialize(io, max_outbox: DEFAULT_MAX_OUTBOX, **opts) ⇒ Object



69
70
71
72
73
74
# File 'lib/object-stream.rb', line 69

def initialize io, max_outbox: DEFAULT_MAX_OUTBOX, **opts
  @io = io
  @max_outbox = max_outbox
  @inbox = nil
  @outbox = []
end

#readObject

If no block given, behaves just the same as #read_one. If block given, reads any available data and yields it to the block. This form is non- blocking, if supported by the underlying serializer (such as msgpack).



83
84
85
86
87
88
89
90
91
# File 'lib/object-stream.rb', line 83

def read
  if block_given?
    read_from_inbox {|obj| yield obj}
    checked_read_from_stream {|obj| yield obj}
    return nil
  else
    read_one
  end
end

#read_oneObject

Read one object from the stream, blocking if necessary. Returns the object. Raises EOFError at the end of the stream.



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/object-stream.rb', line 103

def read_one
  if @inbox and not @inbox.empty?
    return @inbox.shift
  end

  have_result = false
  result = nil
  until have_result
    read do |obj| # might not read enough bytes to yield an obj
      if have_result
        (@inbox||=[]) << obj
      else
        have_result = true
        result = obj
      end
    end
  end
  result
end

#to_ioObject

Makes it possible to use stream in a select.



196
197
198
# File 'lib/object-stream.rb', line 196

def to_io
  io
end

#to_sObject



76
77
78
# File 'lib/object-stream.rb', line 76

def to_s
  "#<#{self.class} io=#{io.inspect}>"
end

#write(*objects) ⇒ Object Also known as: <<

Write the given objects to the stream, first flushing any objects in the outbox. Flushes the underlying byte buffer afterwards.



133
134
135
136
# File 'lib/object-stream.rb', line 133

def write *objects
  write_to_buffer(*objects)
  flush_buffer
end

#write_to_buffer(*objects) ⇒ Object



157
158
159
160
161
162
163
# File 'lib/object-stream.rb', line 157

def write_to_buffer *objects
  flush_outbox
  objects.each do |object|
    write_to_stream object
  end
  self
end

#write_to_outbox(object = nil, &bl) ⇒ Object

Push the given object into the outbox, to be written later when the outbox is flushed. If a block is given, it will be called when the outbox is flushed, and its value will be written instead.



142
143
144
145
146
# File 'lib/object-stream.rb', line 142

def write_to_outbox object=nil, &bl
  @outbox << (bl || object)
  flush_outbox if @outbox.size > max_outbox
  self
end