Class: Async::IO::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/async/io/stream.rb

Constant Summary collapse

BLOCK_SIZE =

The default block size for IO buffers. BLOCK_SIZE = ENV.fetch(‘BLOCK_SIZE’, 1024*16).to_i

1024*8

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, block_size: BLOCK_SIZE, sync: true) ⇒ Stream

Returns a new instance of Stream.



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/async/io/stream.rb', line 31

def initialize(io, block_size: BLOCK_SIZE, sync: true)
	@io = io
	@eof = false
	
	# We don't want Ruby to do any IO buffering.
	@io.sync = sync
	
	@block_size = block_size
	
	@read_buffer = BinaryString.new
	@write_buffer = BinaryString.new
end

Instance Attribute Details

#block_sizeObject (readonly)

Returns the value of attribute block_size.



45
46
47
# File 'lib/async/io/stream.rb', line 45

def block_size
  @block_size
end

#ioObject (readonly)

Returns the value of attribute io.



44
45
46
# File 'lib/async/io/stream.rb', line 44

def io
  @io
end

Instance Method Details

#<<(string) ⇒ Object

Writes ‘string` to the stream and returns self.



113
114
115
116
117
# File 'lib/async/io/stream.rb', line 113

def <<(string)
	write(string)
	
	return self
end

#closeObject

Closes the stream and flushes any unwritten data.



142
143
144
145
146
# File 'lib/async/io/stream.rb', line 142

def close
	flush
ensure
	@io.close
end

#eof?Boolean Also known as: eof

Returns true if the stream is at file which means there is no more data to be read.

Returns:

  • (Boolean)


149
150
151
152
153
# File 'lib/async/io/stream.rb', line 149

def eof?
	fill_read_buffer if !@eof && @read_buffer.empty?
	
	return @eof && @read_buffer.empty?
end

#flushObject

Flushes buffered data to the stream.



120
121
122
123
124
125
# File 'lib/async/io/stream.rb', line 120

def flush
	unless @write_buffer.empty?
		syswrite(@write_buffer)
		@write_buffer.clear
	end
end

#gets(separator = $/) ⇒ Object



127
128
129
130
131
# File 'lib/async/io/stream.rb', line 127

def gets(separator = $/)
	flush
	
	read_until(separator)
end

#peekObject



87
88
89
90
91
# File 'lib/async/io/stream.rb', line 87

def peek
	until yield(@read_buffer) || @eof
		fill_read_buffer
	end
end

#puts(*args, separator: $/) ⇒ Object



133
134
135
136
137
138
139
# File 'lib/async/io/stream.rb', line 133

def puts(*args, separator: $/)
	args.each do |arg|
		@write_buffer << arg << separator
	end
	
	flush
end

#read(size = nil) ⇒ Object

Reads ‘size` bytes from the stream. If size is not specified, read until end of file.



48
49
50
51
52
53
54
55
56
57
58
# File 'lib/async/io/stream.rb', line 48

def read(size = nil)
	return '' if size == 0
	
	if size
		fill_read_buffer until @eof or @read_buffer.size >= size
	else
		fill_read_buffer until @eof
	end
	
	return consume_read_buffer(size)
end

#read_partial(size = nil) ⇒ Object

Read at most ‘size` bytes from the stream. Will avoid reading from the underlying stream if possible.



61
62
63
64
65
66
67
68
69
# File 'lib/async/io/stream.rb', line 61

def read_partial(size = nil)
	return '' if size == 0
	
	if @read_buffer.empty? and !@eof
		fill_read_buffer
	end
	
	return consume_read_buffer(size)
end

#read_until(pattern, offset = 0) ⇒ String

Efficiently read data from the stream until encountering pattern.

Parameters:

  • pattern (String)

    The pattern to match.

Returns:

  • (String)

    The contents of the stream up until the pattern, which is consumed but not returned.



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/async/io/stream.rb', line 74

def read_until(pattern, offset = 0)
	until index = @read_buffer.index(pattern, offset)
		offset = @read_buffer.size
		
		return unless fill_read_buffer
	end
	
	matched = @read_buffer.slice!(0, index)
	@read_buffer.slice!(0, pattern.bytesize)
	
	return matched
end

#write(string) ⇒ Object

Writes ‘string` to the buffer. When the buffer is full or #sync is true the buffer is flushed to the underlying `io`.

Parameters:

  • string

    the string to write to the buffer.

Returns:

  • the number of bytes appended to the buffer.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/async/io/stream.rb', line 97

def write(string)
	if @write_buffer.empty? and string.bytesize >= @block_size
		syswrite(string)
	else
		@write_buffer << string
		
		if @write_buffer.size >= @block_size
			syswrite(@write_buffer)
			@write_buffer.clear
		end
	end
	
	return string.bytesize
end