Class: Async::IO::Stream
- Inherits:
-
Object
- Object
- Async::IO::Stream
- Defined in:
- lib/async/io/stream.rb
Constant Summary collapse
- BLOCK_SIZE =
IO::BLOCK_SIZE
Instance Attribute Summary collapse
-
#block_size ⇒ Object
readonly
Returns the value of attribute block_size.
-
#io ⇒ Object
readonly
Returns the value of attribute io.
Class Method Summary collapse
Instance Method Summary collapse
-
#<<(string) ⇒ Object
Writes ‘string` to the stream and returns self.
-
#close ⇒ Object
Best effort to flush any unwritten data, and then close the underling IO.
- #close_read ⇒ Object
- #close_write ⇒ Object
- #closed? ⇒ Boolean
- #connected? ⇒ Boolean
- #eof! ⇒ Object
-
#eof? ⇒ Boolean
(also: #eof)
Returns true if the stream is at file which means there is no more data to be read.
-
#flush ⇒ Object
Flushes buffered data to the stream.
- #gets(separator = $/, **options) ⇒ Object
-
#initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: false) ⇒ Stream
constructor
A new instance of Stream.
- #peek(size = nil) ⇒ Object
- #puts(*arguments, separator: $/) ⇒ Object
-
#read(size = nil) ⇒ Object
Reads ‘size` bytes from the stream.
- #read_exactly(size, exception: EOFError) ⇒ Object
-
#read_partial(size = nil) ⇒ Object
Read at most ‘size` bytes from the stream.
-
#read_until(pattern, offset = 0, chomp: true) ⇒ String
Efficiently read data from the stream until encountering pattern.
- #readpartial(size = nil) ⇒ Object
-
#write(string) ⇒ Object
Writes ‘string` to the buffer.
Constructor Details
#initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: false) ⇒ Stream
Returns a new instance of Stream.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/async/io/stream.rb', line 30 def initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: false) @io = io @eof = false @pending = 0 # This field is ignored, but used to mean, try to buffer packets in a single iteration of the reactor. # @deferred = deferred @writing = Async::Semaphore.new(1) # We don't want Ruby to do any IO buffering. @io.sync = sync @block_size = block_size @maximum_read_size = maximum_read_size @read_buffer = Buffer.new @write_buffer = Buffer.new @drain_buffer = Buffer.new # Used as destination buffer for underlying reads. @input_buffer = Buffer.new end |
Instance Attribute Details
#block_size ⇒ Object (readonly)
Returns the value of attribute block_size.
56 57 58 |
# File 'lib/async/io/stream.rb', line 56 def block_size @block_size end |
#io ⇒ Object (readonly)
Returns the value of attribute io.
54 55 56 |
# File 'lib/async/io/stream.rb', line 54 def io @io end |
Class Method Details
.open(path, mode = "r+", **options) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/async/io/stream.rb', line 18 def self.open(path, mode = "r+", **) stream = self.new(File.open(path, mode), **) return stream unless block_given? begin yield stream ensure stream.close end end |
Instance Method Details
#<<(string) ⇒ Object
Writes ‘string` to the stream and returns self.
181 182 183 184 185 |
# File 'lib/async/io/stream.rb', line 181 def <<(string) write(string) return self end |
#close ⇒ Object
Best effort to flush any unwritten data, and then close the underling IO.
214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/async/io/stream.rb', line 214 def close return if @io.closed? begin flush rescue # We really can't do anything here unless we want #close to raise exceptions. ensure @io.close end end |
#close_read ⇒ Object
203 204 205 |
# File 'lib/async/io/stream.rb', line 203 def close_read @io.close_read end |
#close_write ⇒ Object
207 208 209 210 211 |
# File 'lib/async/io/stream.rb', line 207 def close_write flush ensure @io.close_write end |
#closed? ⇒ Boolean
199 200 201 |
# File 'lib/async/io/stream.rb', line 199 def closed? @io.closed? end |
#connected? ⇒ Boolean
195 196 197 |
# File 'lib/async/io/stream.rb', line 195 def connected? @io.connected? end |
#eof! ⇒ Object
239 240 241 242 243 244 |
# File 'lib/async/io/stream.rb', line 239 def eof! @read_buffer.clear @eof = true raise EOFError end |
#eof? ⇒ Boolean Also known as: eof
Returns true if the stream is at file which means there is no more data to be read.
227 228 229 230 231 232 233 234 235 |
# File 'lib/async/io/stream.rb', line 227 def eof? if !@read_buffer.empty? return false elsif @eof return true else return @io.eof? end end |
#flush ⇒ Object
Flushes buffered data to the stream.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/async/io/stream.rb', line 150 def flush return if @write_buffer.empty? @writing.acquire do # Flip the write buffer and drain buffer: @write_buffer, @drain_buffer = @drain_buffer, @write_buffer begin @io.write(@drain_buffer) ensure # If the write operation fails, we still need to clear this buffer, and the data is essentially lost. @drain_buffer.clear end end end |
#gets(separator = $/, **options) ⇒ Object
145 146 147 |
# File 'lib/async/io/stream.rb', line 145 def gets(separator = $/, **) read_until(separator, **) end |
#peek(size = nil) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/async/io/stream.rb', line 128 def peek(size = nil) if size until @eof or @read_buffer.bytesize >= size # Compute the amount of data we need to read from the underlying stream: read_size = size - @read_buffer.bytesize # Don't read less than @block_size to avoid lots of small reads: fill_read_buffer(read_size > @block_size ? read_size : @block_size) end return @read_buffer.slice(0, [size, @read_buffer.size].min) end until (block_given? && yield(@read_buffer)) or @eof fill_read_buffer end return @read_buffer end |
#puts(*arguments, separator: $/) ⇒ Object
187 188 189 190 191 192 193 |
# File 'lib/async/io/stream.rb', line 187 def puts(*arguments, separator: $/) arguments.each do |argument| @write_buffer << argument << separator end flush end |
#read(size = nil) ⇒ Object
Reads ‘size` bytes from the stream. If size is not specified, read until end of file.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/async/io/stream.rb', line 59 def read(size = nil) return String.new(encoding: Encoding::BINARY) if size == 0 if size until @eof or @read_buffer.bytesize >= size # Compute the amount of data we need to read from the underlying stream: read_size = size - @read_buffer.bytesize # Don't read less than @block_size to avoid lots of small reads: fill_read_buffer(read_size > @block_size ? read_size : @block_size) end else until @eof fill_read_buffer end end return consume_read_buffer(size) end |
#read_exactly(size, exception: EOFError) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/async/io/stream.rb', line 90 def read_exactly(size, exception: EOFError) if buffer = read(size) if buffer.bytesize != size raise exception, "could not read enough data" end return buffer end raise exception, "encountered eof while reading data" end |
#read_partial(size = nil) ⇒ Object
Read at most ‘size` bytes from the stream. Will avoid reading from the underlying stream if possible.
80 81 82 83 84 85 86 87 88 |
# File 'lib/async/io/stream.rb', line 80 def read_partial(size = nil) return String.new(encoding: Encoding::BINARY) if size == 0 if !@eof and @read_buffer.empty? fill_read_buffer end return consume_read_buffer(size) end |
#read_until(pattern, offset = 0, chomp: true) ⇒ String
Efficiently read data from the stream until encountering pattern.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/async/io/stream.rb', line 109 def read_until(pattern, offset = 0, chomp: true) # We don't want to split on the pattern, so we subtract the size of the pattern. split_offset = pattern.bytesize - 1 until index = @read_buffer.index(pattern, offset) offset = @read_buffer.bytesize - split_offset offset = 0 if offset < 0 return unless fill_read_buffer end @read_buffer.freeze matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize)) @read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize) return matched end |
#readpartial(size = nil) ⇒ Object
102 103 104 |
# File 'lib/async/io/stream.rb', line 102 def readpartial(size = nil) read_partial(size) or raise EOFError, "Encountered eof while reading data!" end |
#write(string) ⇒ Object
Writes ‘string` to the buffer. When the buffer is full or #sync is true the buffer is flushed to the underlying `io`.
170 171 172 173 174 175 176 177 178 |
# File 'lib/async/io/stream.rb', line 170 def write(string) @write_buffer << string if @write_buffer.bytesize >= @block_size flush end return string.bytesize end |