Class: Async::IO::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/async/io/stream.rb

Constant Summary collapse

BLOCK_SIZE =

The default block size for IO buffers. BLOCK_SIZE = ENV.fetch(‘BLOCK_SIZE’, 1024*16).to_i

1024*8

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, block_size: BLOCK_SIZE, sync: true) ⇒ Stream

Returns a new instance of Stream.



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/async/io/stream.rb', line 31

def initialize(io, block_size: BLOCK_SIZE, sync: true)
  @io = io
  @eof = false
  
  # We don't want Ruby to do any IO buffering.
  @io.sync = sync
  
  @block_size = block_size
  
  @read_buffer = BinaryString.new
  @write_buffer = BinaryString.new
end

Instance Attribute Details

#block_sizeObject (readonly)

Returns the value of attribute block_size.



45
46
47
# File 'lib/async/io/stream.rb', line 45

def block_size
  @block_size
end

#ioObject (readonly)

Returns the value of attribute io.



44
45
46
# File 'lib/async/io/stream.rb', line 44

def io
  @io
end

Instance Method Details

#<<(string) ⇒ Object

Writes ‘string` to the stream and returns self.



113
114
115
116
117
# File 'lib/async/io/stream.rb', line 113

def <<(string)
  write(string)
  
  return self
end

#closeObject

Closes the stream and flushes any unwritten data.



150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/async/io/stream.rb', line 150

def close
  return if @io.closed?
  
  begin
    flush
  rescue
    # We really can't do anything here unless we want #close to raise exceptions.
    Async.logger.error(self) {$!}
  ensure
    @io.close
  end
end

#closed?Boolean

Returns:

  • (Boolean)


145
146
147
# File 'lib/async/io/stream.rb', line 145

def closed?
  @io.closed?
end

#connected?Boolean

Returns:

  • (Boolean)


141
142
143
# File 'lib/async/io/stream.rb', line 141

def connected?
  @io.connected?
end

#eof!Object

Raises:

  • (EOFError)


172
173
174
175
176
177
# File 'lib/async/io/stream.rb', line 172

def eof!
  @read_buffer.clear
  @eof = true
  
  raise EOFError
end

#eof?Boolean Also known as: eof

Returns true if the stream is at file which means there is no more data to be read.

Returns:

  • (Boolean)


164
165
166
167
168
# File 'lib/async/io/stream.rb', line 164

def eof?
  fill_read_buffer if !@eof && @read_buffer.empty?
  
  return @eof && @read_buffer.empty?
end

#flushObject

Flushes buffered data to the stream.



120
121
122
123
124
125
# File 'lib/async/io/stream.rb', line 120

def flush
  unless @write_buffer.empty?
    syswrite(@write_buffer)
    @write_buffer.clear
  end
end

#gets(separator = $/) ⇒ Object



127
128
129
130
131
# File 'lib/async/io/stream.rb', line 127

def gets(separator = $/)
  flush
  
  read_until(separator)
end

#peekObject



87
88
89
90
91
# File 'lib/async/io/stream.rb', line 87

def peek
  until yield(@read_buffer) || @eof
    fill_read_buffer
  end
end

#puts(*args, separator: $/) ⇒ Object



133
134
135
136
137
138
139
# File 'lib/async/io/stream.rb', line 133

def puts(*args, separator: $/)
  args.each do |arg|
    @write_buffer << arg << separator
  end
  
  flush
end

#read(size = nil) ⇒ Object

Reads ‘size` bytes from the stream. If size is not specified, read until end of file.



48
49
50
51
52
53
54
55
56
57
58
# File 'lib/async/io/stream.rb', line 48

def read(size = nil)
  return '' if size == 0
  
  if size
    fill_read_buffer until @eof or @read_buffer.size >= size
  else
    fill_read_buffer until @eof
  end
  
  return consume_read_buffer(size)
end

#read_partial(size = nil) ⇒ Object

Read at most ‘size` bytes from the stream. Will avoid reading from the underlying stream if possible.



61
62
63
64
65
66
67
68
69
# File 'lib/async/io/stream.rb', line 61

def read_partial(size = nil)
  return '' if size == 0
  
  if @read_buffer.empty? and !@eof
    fill_read_buffer
  end
  
  return consume_read_buffer(size)
end

#read_until(pattern, offset = 0) ⇒ String

Efficiently read data from the stream until encountering pattern.

Parameters:

  • pattern (String)

    The pattern to match.

Returns:

  • (String)

    The contents of the stream up until the pattern, which is consumed but not returned.



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/async/io/stream.rb', line 74

def read_until(pattern, offset = 0)
  until index = @read_buffer.index(pattern, offset)
    offset = @read_buffer.size
    
    return unless fill_read_buffer
  end
  
  matched = @read_buffer.slice!(0, index)
  @read_buffer.slice!(0, pattern.bytesize)
  
  return matched
end

#write(string) ⇒ Object

Writes ‘string` to the buffer. When the buffer is full or #sync is true the buffer is flushed to the underlying `io`.

Parameters:

  • string

    the string to write to the buffer.

Returns:

  • the number of bytes appended to the buffer.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/async/io/stream.rb', line 97

def write(string)
  if @write_buffer.empty? and string.bytesize >= @block_size
    syswrite(string)
  else
    @write_buffer << string
    
    if @write_buffer.size >= @block_size
      syswrite(@write_buffer)
      @write_buffer.clear
    end
  end
  
  return string.bytesize
end