Class: MarshalStream

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/aggro/marshal_stream.rb

Overview

Private: Wrapper around an IO object to read/write Marshaled objects.

Defined Under Namespace

Classes: StreamError

Constant Summary collapse

DEFAULT_MAX_OUTBOX =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, max_outbox: DEFAULT_MAX_OUTBOX) ⇒ MarshalStream

Returns a new instance of MarshalStream.



12
13
14
15
16
17
# File 'lib/aggro/marshal_stream.rb', line 12

def initialize(io, max_outbox: DEFAULT_MAX_OUTBOX)
  @io = io
  @max_outbox = max_outbox
  @inbox = []
  @outbox = []
end

Instance Attribute Details

#ioObject (readonly)

Returns the value of attribute io.



9
10
11
# File 'lib/aggro/marshal_stream.rb', line 9

def io
  @io
end

#max_outboxObject (readonly)

Returns the value of attribute max_outbox.



10
11
12
# File 'lib/aggro/marshal_stream.rb', line 10

def max_outbox
  @max_outbox
end

Instance Method Details

#closeObject



19
20
21
22
# File 'lib/aggro/marshal_stream.rb', line 19

def close
  flush_outbox
  io.close
end

#closed?Boolean

Returns:

  • (Boolean)


24
25
26
# File 'lib/aggro/marshal_stream.rb', line 24

def closed?
  io.closed?
end

#eachObject



28
29
30
31
32
# File 'lib/aggro/marshal_stream.rb', line 28

def each
  return to_enum unless block_given?

  read { |obj| yield obj } until eof
end

#eof?Boolean Also known as: eof

Returns:

  • (Boolean)


34
35
36
# File 'lib/aggro/marshal_stream.rb', line 34

def eof?
  inbox.empty? && io.eof?
end

#flush_bufferObject



40
41
42
# File 'lib/aggro/marshal_stream.rb', line 40

def flush_buffer
  self
end

#flush_outboxObject



44
45
46
47
48
49
# File 'lib/aggro/marshal_stream.rb', line 44

def flush_outbox
  outbox.each { |obj| write_to_stream(obj.is_a? Proc ? obj.call : obj) }
  outbox.clear

  self
end

#readObject



51
52
53
54
55
56
57
58
59
60
# File 'lib/aggro/marshal_stream.rb', line 51

def read
  if block_given?
    read_from_inbox { |obj| yield obj }
    read_from_stream { |obj| yield obj }

    nil
  else
    read_one
  end
end

#read_from_streamObject



62
63
64
65
66
67
68
# File 'lib/aggro/marshal_stream.rb', line 62

def read_from_stream
  yield Marshal.load(io)
rescue IOError, SystemCallError
  raise
rescue => e
  raise StreamError, "Unreadble stream: #{e}"
end

#read_oneObject



70
71
72
73
74
75
76
77
78
# File 'lib/aggro/marshal_stream.rb', line 70

def read_one
  return inbox.shift unless inbox.empty?

  result = nil

  read { |obj| result.nil? ? result = obj : (inbox << obj) } while result.nil?

  result
end

#to_ioObject



80
81
82
# File 'lib/aggro/marshal_stream.rb', line 80

def to_io
  io
end

#write(*objects) ⇒ Object Also known as: <<



84
85
86
87
# File 'lib/aggro/marshal_stream.rb', line 84

def write(*objects)
  write_to_buffer(*objects)
  flush_buffer
end

#write_to_buffer(*objects) ⇒ Object



91
92
93
94
95
96
# File 'lib/aggro/marshal_stream.rb', line 91

def write_to_buffer(*objects)
  flush_outbox
  objects.each { |object| write_to_stream object }

  self
end

#write_to_outbox(object = nil, &block) ⇒ Object



98
99
100
101
102
103
104
# File 'lib/aggro/marshal_stream.rb', line 98

def write_to_outbox(object = nil, &block)
  outbox << (block || object)

  flush_outbox if outbox.size > max_outbox

  self
end

#write_to_stream(object) ⇒ Object



106
107
108
109
110
# File 'lib/aggro/marshal_stream.rb', line 106

def write_to_stream(object)
  Marshal.dump(object, io)

  self
end