Class: Async::IO::Stream
- Inherits:
-
Object
- Object
- Async::IO::Stream
- Defined in:
- lib/async/io/stream.rb
Constant Summary collapse
- BLOCK_SIZE =
IO::BLOCK_SIZE
Instance Attribute Summary collapse
-
#block_size ⇒ Object
readonly
Returns the value of attribute block_size.
-
#io ⇒ Object
readonly
Returns the value of attribute io.
Class Method Summary collapse
Instance Method Summary collapse
-
#<<(string) ⇒ Object
Writes
stringto the stream and returns self. -
#close ⇒ Object
Best effort to flush any unwritten data, and then close the underling IO.
- #close_read ⇒ Object
- #close_write ⇒ Object
- #closed? ⇒ Boolean
- #connected? ⇒ Boolean
- #eof! ⇒ Object
-
#eof? ⇒ Boolean
(also: #eof)
Returns true if the stream is at file which means there is no more data to be read.
-
#flush ⇒ Object
Flushes buffered data to the stream.
- #gets(separator = $/, **options) ⇒ Object
-
#initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true) ⇒ Stream
constructor
A new instance of Stream.
- #peek ⇒ Object
- #puts(*args, separator: $/) ⇒ Object
-
#read(size = nil) ⇒ Object
Reads
sizebytes from the stream. - #read_exactly(size, exception: EOFError) ⇒ Object
-
#read_partial(size = nil) ⇒ Object
(also: #readpartial)
Read at most
sizebytes from the stream. -
#read_until(pattern, offset = 0, chomp: true) ⇒ String
Efficiently read data from the stream until encountering pattern.
-
#write(string) ⇒ Object
Writes
stringto the buffer.
Constructor Details
#initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true) ⇒ Stream
Returns a new instance of Stream.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/async/io/stream.rb', line 41 def initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true) @io = io @eof = false # We don't want Ruby to do any IO buffering. @io.sync = sync @block_size = block_size @maximum_read_size = maximum_read_size @read_buffer = Buffer.new @write_buffer = Buffer.new # Used as destination buffer for underlying reads. @input_buffer = Buffer.new end |
Instance Attribute Details
#block_size ⇒ Object (readonly)
Returns the value of attribute block_size.
59 60 61 |
# File 'lib/async/io/stream.rb', line 59 def block_size @block_size end |
#io ⇒ Object (readonly)
Returns the value of attribute io.
58 59 60 |
# File 'lib/async/io/stream.rb', line 58 def io @io end |
Class Method Details
.open(path, mode = "r+", **options) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/async/io/stream.rb', line 29 def self.open(path, mode = "r+", **) stream = self.new(File.open(path, mode), **) return stream unless block_given? begin yield stream ensure stream.close end end |
Instance Method Details
#<<(string) ⇒ Object
Writes string to the stream and returns self.
159 160 161 162 163 |
# File 'lib/async/io/stream.rb', line 159 def <<(string) write(string) return self end |
#close ⇒ Object
Best effort to flush any unwritten data, and then close the underling IO.
204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/async/io/stream.rb', line 204 def close return if @io.closed? begin flush rescue # We really can't do anything here unless we want #close to raise exceptions. ensure @io.close end end |
#close_read ⇒ Object
193 194 195 |
# File 'lib/async/io/stream.rb', line 193 def close_read @io.close_read end |
#close_write ⇒ Object
197 198 199 200 201 |
# File 'lib/async/io/stream.rb', line 197 def close_write flush ensure @io.close_write end |
#closed? ⇒ Boolean
189 190 191 |
# File 'lib/async/io/stream.rb', line 189 def closed? @io.closed? end |
#connected? ⇒ Boolean
185 186 187 |
# File 'lib/async/io/stream.rb', line 185 def connected? @io.connected? end |
#eof! ⇒ Object
225 226 227 228 229 230 |
# File 'lib/async/io/stream.rb', line 225 def eof! @read_buffer.clear @eof = true raise EOFError end |
#eof? ⇒ Boolean Also known as: eof
Returns true if the stream is at file which means there is no more data to be read.
217 218 219 220 221 |
# File 'lib/async/io/stream.rb', line 217 def eof? fill_read_buffer if !@eof && @read_buffer.empty? return @eof && @read_buffer.empty? end |
#flush ⇒ Object
Flushes buffered data to the stream.
166 167 168 169 170 171 |
# File 'lib/async/io/stream.rb', line 166 def flush unless @write_buffer.empty? @io.write(@write_buffer) @write_buffer.clear end end |
#gets(separator = $/, **options) ⇒ Object
173 174 175 |
# File 'lib/async/io/stream.rb', line 173 def gets(separator = $/, **) read_until(separator, **) end |
#peek ⇒ Object
133 134 135 136 137 |
# File 'lib/async/io/stream.rb', line 133 def peek until yield(@read_buffer) or @eof fill_read_buffer end end |
#puts(*args, separator: $/) ⇒ Object
177 178 179 180 181 182 183 |
# File 'lib/async/io/stream.rb', line 177 def puts(*args, separator: $/) args.each do |arg| @write_buffer << arg << separator end flush end |
#read(size = nil) ⇒ Object
Reads size bytes from the stream. If size is not specified, read until end of file.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/async/io/stream.rb', line 62 def read(size = nil) return '' if size == 0 if size until @eof or @read_buffer.size >= size # Compute the amount of data we need to read from the underlying stream: read_size = size - @read_buffer.bytesize # Don't read less than @block_size to avoid lots of small reads: fill_read_buffer(read_size > @block_size ? read_size : @block_size) end else until @eof fill_read_buffer end end return consume_read_buffer(size) end |
#read_exactly(size, exception: EOFError) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/async/io/stream.rb', line 97 def read_exactly(size, exception: EOFError) if buffer = read(size) if buffer.bytesize != size raise exception, "could not read enough data" end return buffer end raise exception, "encountered eof while reading data" end |
#read_partial(size = nil) ⇒ Object Also known as: readpartial
Read at most size bytes from the stream. Will avoid reading from the underlying stream if possible.
83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/async/io/stream.rb', line 83 def read_partial(size = nil) return '' if size == 0 unless @eof if size and @read_buffer.bytesize < size fill_read_buffer(size > @block_size ? size : @block_size) elsif @read_buffer.empty? fill_read_buffer end end return consume_read_buffer(size) end |
#read_until(pattern, offset = 0, chomp: true) ⇒ String
Efficiently read data from the stream until encountering pattern.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/async/io/stream.rb', line 114 def read_until(pattern, offset = 0, chomp: true) # We don't want to split on the pattern, so we subtract the size of the pattern. split_offset = pattern.bytesize - 1 until index = @read_buffer.index(pattern, offset) offset = @read_buffer.size - split_offset offset = 0 if offset < 0 return unless fill_read_buffer end @read_buffer.freeze matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize)) @read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize) return matched end |
#write(string) ⇒ Object
Writes string to the buffer. When the buffer is full or #sync is true the
buffer is flushed to the underlying io.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/async/io/stream.rb', line 143 def write(string) if @write_buffer.empty? and string.bytesize >= @block_size @io.write(string) else @write_buffer << string if @write_buffer.size >= @block_size @io.write(@write_buffer) @write_buffer.clear end end return string.bytesize end |