Class: Celluloid::IO::Stream
- Includes:
- Enumerable
- Defined in:
- lib/celluloid/io/stream.rb
Overview
Base class of all streams in Celluloid::IO
Direct Known Subclasses
Defined Under Namespace
Classes: Latch
Constant Summary
Constants inherited from Socket
Celluloid::IO::Socket::Constants
Instance Attribute Summary collapse
-
#sync ⇒ Object
The “sync mode” of the stream.
Instance Method Summary collapse
-
#<<(s) ⇒ Object
Writes
s
to the stream. -
#close ⇒ Object
Closes the stream and flushes any unwritten data.
-
#each(eol = $/) ⇒ Object
(also: #each_line)
Executes the block for every line in the stream where lines are separated by
eol
. -
#each_byte ⇒ Object
Calls the given block once for each byte in the stream.
-
#eof? ⇒ Boolean
(also: #eof)
Returns true if the stream is at file which means there is no more data to be read.
-
#flush ⇒ Object
Flushes buffered data to the stream.
-
#getc ⇒ Object
Reads one character from the stream.
-
#gets(eol = $/, limit = nil) ⇒ Object
Reads the next line from the stream.
-
#initialize(socket) ⇒ Stream
constructor
A new instance of Stream.
-
#print(*args) ⇒ Object
Writes
args
to the stream. -
#printf(s, *args) ⇒ Object
Formats and writes to the stream converting parameters under control of the format string.
-
#puts(*args) ⇒ Object
Writes
args
to the stream along with a record separator. -
#read(size = nil, buf = nil) ⇒ Object
Reads
size
bytes from the stream. -
#readchar ⇒ Object
Reads a one-character string from the stream.
-
#readline(eol = $/) ⇒ Object
Reads a line from the stream which is separated by
eol
. -
#readlines(eol = $/) ⇒ Object
Reads lines from the stream which are separated by
eol
. -
#readpartial(maxlen, buf = nil) ⇒ Object
Reads at most
maxlen
bytes from the stream. -
#sysread(length = nil, buffer = nil) ⇒ Object
System read via the nonblocking subsystem.
-
#syswrite(string) ⇒ Object
System write via the nonblocking subsystem.
-
#ungetc(c) ⇒ Object
Pushes character
c
back onto the stream such that a subsequent buffered character read will return it. -
#wait_readable ⇒ Object
Wait until the current object is readable.
-
#wait_writable ⇒ Object
Wait until the current object is writable.
-
#write(s) ⇒ Object
Writes
s
to the stream.
Methods inherited from Socket
Constructor Details
#initialize(socket) ⇒ Stream
Returns a new instance of Stream.
19 20 21 22 23 24 25 26 27 28 |
# File 'lib/celluloid/io/stream.rb', line 19 def initialize(socket) super @eof = false @sync = true @read_buffer = ''.force_encoding(Encoding::ASCII_8BIT) @write_buffer = ''.force_encoding(Encoding::ASCII_8BIT) @read_latch = Latch.new @write_latch = Latch.new end |
Instance Attribute Details
#sync ⇒ Object
The “sync mode” of the stream
See IO#sync for full details.
17 18 19 |
# File 'lib/celluloid/io/stream.rb', line 17 def sync @sync end |
Instance Method Details
#<<(s) ⇒ Object
Writes s
to the stream. s
will be converted to a String using String#to_s.
256 257 258 259 |
# File 'lib/celluloid/io/stream.rb', line 256 def << (s) do_write(s) self end |
#close ⇒ Object
Closes the stream and flushes any unwritten data.
311 312 313 314 |
# File 'lib/celluloid/io/stream.rb', line 311 def close flush rescue nil super end |
#each(eol = $/) ⇒ Object Also known as: each_line
Executes the block for every line in the stream where lines are separated by eol
.
See also #gets
181 182 183 184 185 |
# File 'lib/celluloid/io/stream.rb', line 181 def each(eol=$/) while line = self.gets(eol) yield line end end |
#each_byte ⇒ Object
Calls the given block once for each byte in the stream.
216 217 218 219 220 |
# File 'lib/celluloid/io/stream.rb', line 216 def each_byte # :yields: byte while c = getc yield(c.ord) end end |
#eof? ⇒ Boolean Also known as: eof
Returns true if the stream is at file which means there is no more data to be read.
241 242 243 244 |
# File 'lib/celluloid/io/stream.rb', line 241 def eof? fill_rbuff if !@eof && @read_buffer.empty? @eof && @read_buffer.empty? end |
#flush ⇒ Object
Flushes buffered data to the stream.
301 302 303 304 305 306 307 308 |
# File 'lib/celluloid/io/stream.rb', line 301 def flush osync = @sync @sync = true do_write "" return self ensure @sync = osync end |
#getc ⇒ Object
Reads one character from the stream. Returns nil if called at end of file.
211 212 213 |
# File 'lib/celluloid/io/stream.rb', line 211 def getc read(1) end |
#gets(eol = $/, limit = nil) ⇒ Object
Reads the next line from the stream. Lines are separated by eol
. If limit
is provided the result will not be longer than the given number of bytes.
eol
may be a String or Regexp.
Unlike IO#gets the line read will not be assigned to $_.
Unlike IO#gets the separator must be provided if a limit is provided.
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/celluloid/io/stream.rb', line 155 def gets(eol=$/, limit=nil) idx = @read_buffer.index(eol) until @eof break if idx fill_rbuff idx = @read_buffer.index(eol) end if eol.is_a?(Regexp) size = idx ? idx+$&.size : nil else size = idx ? idx+eol.size : nil end if limit and limit >= 0 size = [size, limit].min end consume_rbuff(size) end |
#print(*args) ⇒ Object
Writes args
to the stream.
See IO#print for full details.
284 285 286 287 288 289 |
# File 'lib/celluloid/io/stream.rb', line 284 def print(*args) s = "" args.each { |arg| s << arg.to_s } do_write(s) nil end |
#printf(s, *args) ⇒ Object
Formats and writes to the stream converting parameters under control of the format string.
See Kernel#sprintf for format string details.
295 296 297 298 |
# File 'lib/celluloid/io/stream.rb', line 295 def printf(s, *args) do_write(s % args) nil end |
#puts(*args) ⇒ Object
Writes args
to the stream along with a record separator.
See IO#puts for full details.
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/celluloid/io/stream.rb', line 264 def puts(*args) s = "" if args.empty? s << "\n" end args.each do |arg| s << arg.to_s if $/ && /\n\z/ !~ s s << "\n" end end do_write(s) nil end |
#read(size = nil, buf = nil) ⇒ Object
Reads size
bytes from the stream. If buf
is provided it must reference a string which will receive the data.
See IO#read for full details.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/celluloid/io/stream.rb', line 87 def read(size=nil, buf=nil) if size == 0 if buf buf.clear return buf else return "" end end until @eof break if size && size <= @read_buffer.size fill_rbuff break unless size end ret = consume_rbuff(size) || "" if buf buf.replace(ret) ret = buf end (size && ret.empty?) ? nil : ret end |
#readchar ⇒ Object
Reads a one-character string from the stream. Raises an EOFError at end of file.
224 225 226 227 |
# File 'lib/celluloid/io/stream.rb', line 224 def readchar raise EOFError if eof? getc end |
#readline(eol = $/) ⇒ Object
Reads a line from the stream which is separated by eol
.
Raises EOFError if at end of file.
204 205 206 207 |
# File 'lib/celluloid/io/stream.rb', line 204 def readline(eol=$/) raise EOFError if eof? gets(eol) end |
#readlines(eol = $/) ⇒ Object
Reads lines from the stream which are separated by eol
.
See also #gets
191 192 193 194 195 196 197 198 199 |
# File 'lib/celluloid/io/stream.rb', line 191 def readlines(eol=$/) ary = [] while line = self.gets(eol) ary << line end ary end |
#readpartial(maxlen, buf = nil) ⇒ Object
Reads at most maxlen
bytes from the stream. If buf
is provided it must reference a string which will receive the data.
See IO#readpartial for full details.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/celluloid/io/stream.rb', line 117 def readpartial(maxlen, buf=nil) if maxlen == 0 if buf buf.clear return buf else return "" end end if @read_buffer.empty? begin return sysread(maxlen, buf) rescue Errno::EAGAIN retry end end ret = consume_rbuff(maxlen) if buf buf.replace(ret) ret = buf end raise EOFError if ret.empty? ret end |
#sysread(length = nil, buffer = nil) ⇒ Object
System read via the nonblocking subsystem
37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/celluloid/io/stream.rb', line 37 def sysread(length = nil, buffer = nil) buffer ||= ''.force_encoding(Encoding::ASCII_8BIT) @read_latch.synchronize do begin read_nonblock(length, buffer) rescue ::IO::WaitReadable wait_readable retry end end buffer end |
#syswrite(string) ⇒ Object
System write via the nonblocking subsystem
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/celluloid/io/stream.rb', line 53 def syswrite(string) length = string.length total_written = 0 remaining = string @write_latch.synchronize do while total_written < length begin written = write_nonblock(remaining) rescue ::IO::WaitWritable wait_writable retry rescue EOFError return total_written rescue Errno::EAGAIN wait_writable retry end total_written += written # FIXME: mutating the original buffer here. Seems bad. remaining.slice!(0, written) if written < remaining.length end end total_written end |
#ungetc(c) ⇒ Object
Pushes character c
back onto the stream such that a subsequent buffered character read will return it.
Unlike IO#getc multiple bytes may be pushed back onto the stream.
Has no effect on unbuffered reads (such as #sysread).
235 236 237 |
# File 'lib/celluloid/io/stream.rb', line 235 def ungetc(c) @read_buffer[0,0] = c.chr end |
#wait_readable ⇒ Object
Wait until the current object is readable
31 |
# File 'lib/celluloid/io/stream.rb', line 31 def wait_readable; Celluloid::IO.wait_readable(self); end |
#wait_writable ⇒ Object
Wait until the current object is writable
34 |
# File 'lib/celluloid/io/stream.rb', line 34 def wait_writable; Celluloid::IO.wait_writable(self); end |
#write(s) ⇒ Object
Writes s
to the stream. If the argument is not a string it will be converted using String#to_s. Returns the number of bytes written.
249 250 251 252 |
# File 'lib/celluloid/io/stream.rb', line 249 def write(s) do_write(s) s.bytesize end |