Class: Async::IO::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/async/io/stream.rb

Constant Summary collapse

BLOCK_SIZE =

The default block size for IO buffers. BLOCK_SIZE = ENV.fetch(‘BLOCK_SIZE’, 1024*16).to_i

1024*8

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, block_size: BLOCK_SIZE, sync: true) ⇒ Stream

Returns a new instance of Stream.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/async/io/stream.rb', line 31

def initialize(io, block_size: BLOCK_SIZE, sync: true)
	@io = io
	@eof = false
	
	# We don't want Ruby to do any IO buffering.
	@io.sync = sync
	
	@block_size = block_size
	
	@read_buffer = BinaryString.new
	@write_buffer = BinaryString.new
	
	# Used as destination buffer for underlying reads.
	@input_buffer = BinaryString.new
end

Instance Attribute Details

#block_sizeObject (readonly)

Returns the value of attribute block_size.



48
49
50
# File 'lib/async/io/stream.rb', line 48

def block_size
  @block_size
end

#ioObject (readonly)

Returns the value of attribute io.



47
48
49
# File 'lib/async/io/stream.rb', line 47

def io
  @io
end

Instance Method Details

#<<(string) ⇒ Object

Writes ‘string` to the stream and returns self.



116
117
118
119
120
# File 'lib/async/io/stream.rb', line 116

def <<(string)
	write(string)
	
	return self
end

#closeObject

Closes the stream and flushes any unwritten data.



153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/async/io/stream.rb', line 153

def close
	return if @io.closed?
	
	begin
		flush
	rescue
		# We really can't do anything here unless we want #close to raise exceptions.
		Async.logger.error(self) {$!}
	ensure
		@io.close
	end
end

#closed?Boolean

Returns:

  • (Boolean)


148
149
150
# File 'lib/async/io/stream.rb', line 148

def closed?
	@io.closed?
end

#connected?Boolean

Returns:

  • (Boolean)


144
145
146
# File 'lib/async/io/stream.rb', line 144

def connected?
	@io.connected?
end

#eof!Object

Raises:

  • (EOFError)


175
176
177
178
179
180
# File 'lib/async/io/stream.rb', line 175

def eof!
	@read_buffer.clear
	@eof = true
	
	raise EOFError
end

#eof?Boolean Also known as: eof

Returns true if the stream is at file which means there is no more data to be read.

Returns:

  • (Boolean)


167
168
169
170
171
# File 'lib/async/io/stream.rb', line 167

def eof?
	fill_read_buffer if !@eof && @read_buffer.empty?
	
	return @eof && @read_buffer.empty?
end

#flushObject

Flushes buffered data to the stream.



123
124
125
126
127
128
# File 'lib/async/io/stream.rb', line 123

def flush
	unless @write_buffer.empty?
		syswrite(@write_buffer)
		@write_buffer.clear
	end
end

#gets(separator = $/) ⇒ Object



130
131
132
133
134
# File 'lib/async/io/stream.rb', line 130

def gets(separator = $/)
	flush
	
	read_until(separator)
end

#peekObject



90
91
92
93
94
# File 'lib/async/io/stream.rb', line 90

def peek
	until yield(@read_buffer) || @eof
		fill_read_buffer
	end
end

#puts(*args, separator: $/) ⇒ Object



136
137
138
139
140
141
142
# File 'lib/async/io/stream.rb', line 136

def puts(*args, separator: $/)
	args.each do |arg|
		@write_buffer << arg << separator
	end
	
	flush
end

#read(size = nil) ⇒ Object

Reads ‘size` bytes from the stream. If size is not specified, read until end of file.



51
52
53
54
55
56
57
58
59
60
61
# File 'lib/async/io/stream.rb', line 51

def read(size = nil)
	return '' if size == 0
	
	if size
		fill_read_buffer until @eof or @read_buffer.size >= size
	else
		fill_read_buffer until @eof
	end
	
	return consume_read_buffer(size)
end

#read_partial(size = nil) ⇒ Object

Read at most ‘size` bytes from the stream. Will avoid reading from the underlying stream if possible.



64
65
66
67
68
69
70
71
72
# File 'lib/async/io/stream.rb', line 64

def read_partial(size = nil)
	return '' if size == 0
	
	if @read_buffer.empty? and !@eof
		fill_read_buffer
	end
	
	return consume_read_buffer(size)
end

#read_until(pattern, offset = 0) ⇒ String

Efficiently read data from the stream until encountering pattern.

Parameters:

  • pattern (String)

    The pattern to match.

Returns:

  • (String)

    The contents of the stream up until the pattern, which is consumed but not returned.



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/async/io/stream.rb', line 77

def read_until(pattern, offset = 0)
	until index = @read_buffer.index(pattern, offset)
		offset = @read_buffer.size
		
		return unless fill_read_buffer
	end
	
	matched = @read_buffer.slice!(0, index)
	@read_buffer.slice!(0, pattern.bytesize)
	
	return matched
end

#write(string) ⇒ Object

Writes ‘string` to the buffer. When the buffer is full or #sync is true the buffer is flushed to the underlying `io`.

Parameters:

  • string

    the string to write to the buffer.

Returns:

  • the number of bytes appended to the buffer.



100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/async/io/stream.rb', line 100

def write(string)
	if @write_buffer.empty? and string.bytesize >= @block_size
		syswrite(string)
	else
		@write_buffer << string
		
		if @write_buffer.size >= @block_size
			syswrite(@write_buffer)
			@write_buffer.clear
		end
	end
	
	return string.bytesize
end