Class: Async::IO::Stream
- Inherits:
-
Object
- Object
- Async::IO::Stream
- Defined in:
- lib/async/io/stream.rb
Constant Summary collapse
- BLOCK_SIZE =
IO::BLOCK_SIZE
Instance Attribute Summary collapse
-
#block_size ⇒ Object
readonly
Returns the value of attribute block_size.
-
#io ⇒ Object
readonly
Returns the value of attribute io.
Class Method Summary collapse
Instance Method Summary collapse
-
#<<(string) ⇒ Object
Writes
string
to the stream and returns self. -
#close ⇒ Object
Best effort to flush any unwritten data, and then close the underling IO.
- #close_read ⇒ Object
- #close_write ⇒ Object
- #closed? ⇒ Boolean
- #connected? ⇒ Boolean
- #eof! ⇒ Object
-
#eof? ⇒ Boolean
(also: #eof)
Returns true if the stream is at file which means there is no more data to be read.
-
#flush(deferred: @deferred) ⇒ Object
Flushes buffered data to the stream.
- #gets(separator = $/, **options) ⇒ Object
-
#initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: false) ⇒ Stream
constructor
A new instance of Stream.
- #peek ⇒ Object
- #puts(*arguments, separator: $/) ⇒ Object
-
#read(size = nil) ⇒ Object
Reads
size
bytes from the stream. - #read_exactly(size, exception: EOFError) ⇒ Object
-
#read_partial(size = nil) ⇒ Object
(also: #readpartial)
Read at most
size
bytes from the stream. -
#read_until(pattern, offset = 0, chomp: true) ⇒ String
Efficiently read data from the stream until encountering pattern.
-
#write(string) ⇒ Object
Writes
string
to the buffer.
Constructor Details
#initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: false) ⇒ Stream
Returns a new instance of Stream.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/async/io/stream.rb', line 45 def initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: false) @io = io @eof = false @deferred = deferred @pending = 0 @writing = Async::Semaphore.new(1) # We don't want Ruby to do any IO buffering. @io.sync = sync @block_size = block_size @maximum_read_size = maximum_read_size @read_buffer = Buffer.new @write_buffer = Buffer.new @drain_buffer = Buffer.new # Used as destination buffer for underlying reads. @input_buffer = Buffer.new end |
Instance Attribute Details
#block_size ⇒ Object (readonly)
Returns the value of attribute block_size.
70 71 72 |
# File 'lib/async/io/stream.rb', line 70 def block_size @block_size end |
#io ⇒ Object (readonly)
Returns the value of attribute io.
68 69 70 |
# File 'lib/async/io/stream.rb', line 68 def io @io end |
Class Method Details
.open(path, mode = "r+", **options) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/async/io/stream.rb', line 33 def self.open(path, mode = "r+", **) stream = self.new(File.open(path, mode), **) return stream unless block_given? begin yield stream ensure stream.close end end |
Instance Method Details
#<<(string) ⇒ Object
Writes string
to the stream and returns self.
186 187 188 189 190 |
# File 'lib/async/io/stream.rb', line 186 def <<(string) write(string) return self end |
#close ⇒ Object
Best effort to flush any unwritten data, and then close the underling IO.
219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/async/io/stream.rb', line 219 def close return if @io.closed? begin drain_write_buffer unless @write_buffer.empty? rescue # We really can't do anything here unless we want #close to raise exceptions. ensure @io.close end end |
#close_read ⇒ Object
208 209 210 |
# File 'lib/async/io/stream.rb', line 208 def close_read @io.close_read end |
#close_write ⇒ Object
212 213 214 215 216 |
# File 'lib/async/io/stream.rb', line 212 def close_write drain_write_buffer unless @write_buffer.empty? ensure @io.close_write end |
#closed? ⇒ Boolean
204 205 206 |
# File 'lib/async/io/stream.rb', line 204 def closed? @io.closed? end |
#connected? ⇒ Boolean
200 201 202 |
# File 'lib/async/io/stream.rb', line 200 def connected? @io.connected? end |
#eof! ⇒ Object
244 245 246 247 248 249 |
# File 'lib/async/io/stream.rb', line 244 def eof! @read_buffer.clear @eof = true raise EOFError end |
#eof? ⇒ Boolean Also known as: eof
Returns true if the stream is at file which means there is no more data to be read.
232 233 234 235 236 237 238 239 240 |
# File 'lib/async/io/stream.rb', line 232 def eof? if !@read_buffer.empty? return false elsif @eof return true else return @io.eof? end end |
#flush(deferred: @deferred) ⇒ Object
Flushes buffered data to the stream.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/async/io/stream.rb', line 151 def flush(deferred: @deferred) if deferred and task = Task.current? # Despite how it looks, this field is not actually directly related to whether or not writes should occur. It's actually used to control the logic of deferred flushing. Therefore, it should NOT be modified outside this method. @pending += 1 if @pending == 1 task.yield begin drain_write_buffer unless @write_buffer.empty? ensure # The write buffer no longer contains pending writes @pending = 0 end end else drain_write_buffer unless @write_buffer.empty? end end |
#gets(separator = $/, **options) ⇒ Object
146 147 148 |
# File 'lib/async/io/stream.rb', line 146 def gets(separator = $/, **) read_until(separator, **) end |
#peek ⇒ Object
140 141 142 143 144 |
# File 'lib/async/io/stream.rb', line 140 def peek until yield(@read_buffer) or @eof fill_read_buffer end end |
#puts(*arguments, separator: $/) ⇒ Object
192 193 194 195 196 197 198 |
# File 'lib/async/io/stream.rb', line 192 def puts(*arguments, separator: $/) arguments.each do |argument| @write_buffer << argument << separator end flush end |
#read(size = nil) ⇒ Object
Reads size
bytes from the stream. If size is not specified, read until end of file.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/async/io/stream.rb', line 73 def read(size = nil) return '' if size == 0 if size until @eof or @read_buffer.bytesize >= size # Compute the amount of data we need to read from the underlying stream: read_size = size - @read_buffer.bytesize # Don't read less than @block_size to avoid lots of small reads: fill_read_buffer(read_size > @block_size ? read_size : @block_size) end else until @eof fill_read_buffer end end return consume_read_buffer(size) end |
#read_exactly(size, exception: EOFError) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/async/io/stream.rb', line 104 def read_exactly(size, exception: EOFError) if buffer = read(size) if buffer.bytesize != size raise exception, "could not read enough data" end return buffer end raise exception, "encountered eof while reading data" end |
#read_partial(size = nil) ⇒ Object Also known as: readpartial
Read at most size
bytes from the stream. Will avoid reading from the underlying stream if possible.
94 95 96 97 98 99 100 101 102 |
# File 'lib/async/io/stream.rb', line 94 def read_partial(size = nil) return '' if size == 0 if !@eof and @read_buffer.empty? fill_read_buffer end return consume_read_buffer(size) end |
#read_until(pattern, offset = 0, chomp: true) ⇒ String
Efficiently read data from the stream until encountering pattern.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/async/io/stream.rb', line 121 def read_until(pattern, offset = 0, chomp: true) # We don't want to split on the pattern, so we subtract the size of the pattern. split_offset = pattern.bytesize - 1 until index = @read_buffer.index(pattern, offset) offset = @read_buffer.bytesize - split_offset offset = 0 if offset < 0 return unless fill_read_buffer end @read_buffer.freeze matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize)) @read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize) return matched end |
#write(string) ⇒ Object
Writes string
to the buffer. When the buffer is full or #sync is true the
buffer is flushed to the underlying io
.
175 176 177 178 179 180 181 182 183 |
# File 'lib/async/io/stream.rb', line 175 def write(string) @write_buffer << string if @write_buffer.bytesize >= @block_size flush end return string.bytesize end |