Class: Celluloid::IO::Stream

Inherits:
Socket
  • Object
show all
Includes:
Enumerable
Defined in:
lib/celluloid/io/stream.rb

Overview

Base class of all streams in Celluloid::IO

Direct Known Subclasses

SSLSocket, TCPSocket, UNIXSocket

Defined Under Namespace

Classes: Latch

Constant Summary

Constants inherited from Socket

Celluloid::IO::Socket::Constants

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Socket

new, #to_io, try_convert

Constructor Details

#initialize(socket) ⇒ Stream

Returns a new instance of Stream.



19
20
21
22
23
24
25
26
27
28
# File 'lib/celluloid/io/stream.rb', line 19

def initialize(socket)
  super
  @eof  = false
  @sync = true
  @read_buffer = ''.force_encoding(Encoding::ASCII_8BIT)
  @write_buffer = ''.force_encoding(Encoding::ASCII_8BIT)

  @read_latch  = Latch.new
  @write_latch = Latch.new
end

Instance Attribute Details

#syncObject

The “sync mode” of the stream

See IO#sync for full details.



17
18
19
# File 'lib/celluloid/io/stream.rb', line 17

def sync
  @sync
end

Instance Method Details

#<<(s) ⇒ Object

Writes s to the stream. s will be converted to a String using String#to_s.



256
257
258
259
# File 'lib/celluloid/io/stream.rb', line 256

def << (s)
  do_write(s)
  self
end

#closeObject

Closes the stream and flushes any unwritten data.



311
312
313
314
# File 'lib/celluloid/io/stream.rb', line 311

def close
  flush rescue nil
  super
end

#each(eol = $/) ⇒ Object Also known as: each_line

Executes the block for every line in the stream where lines are separated by eol.

See also #gets



181
182
183
184
185
# File 'lib/celluloid/io/stream.rb', line 181

def each(eol=$/)
  while line = self.gets(eol)
    yield line
  end
end

#each_byteObject

Calls the given block once for each byte in the stream.



216
217
218
219
220
# File 'lib/celluloid/io/stream.rb', line 216

def each_byte # :yields: byte
  while c = getc
    yield(c.ord)
  end
end

#eof?Boolean Also known as: eof

Returns true if the stream is at file which means there is no more data to be read.

Returns:

  • (Boolean)


241
242
243
244
# File 'lib/celluloid/io/stream.rb', line 241

def eof?
  fill_rbuff if !@eof && @read_buffer.empty?
  @eof && @read_buffer.empty?
end

#flushObject

Flushes buffered data to the stream.



301
302
303
304
305
306
307
308
# File 'lib/celluloid/io/stream.rb', line 301

def flush
  osync = @sync
  @sync = true
  do_write ""
  return self
ensure
  @sync = osync
end

#getcObject

Reads one character from the stream. Returns nil if called at end of file.



211
212
213
# File 'lib/celluloid/io/stream.rb', line 211

def getc
  read(1)
end

#gets(eol = $/, limit = nil) ⇒ Object

Reads the next line from the stream. Lines are separated by eol. If limit is provided the result will not be longer than the given number of bytes.

eol may be a String or Regexp.

Unlike IO#gets the line read will not be assigned to $_.

Unlike IO#gets the separator must be provided if a limit is provided.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/celluloid/io/stream.rb', line 155

def gets(eol=$/, limit=nil)
  idx = @read_buffer.index(eol)

  until @eof
    break if idx
    fill_rbuff
    idx = @read_buffer.index(eol)
  end

  if eol.is_a?(Regexp)
    size = idx ? idx+$&.size : nil
  else
    size = idx ? idx+eol.size : nil
  end

  if limit and limit >= 0
    size = [size, limit].min
  end

  consume_rbuff(size)
end

Writes args to the stream.

See IO#print for full details.



284
285
286
287
288
289
# File 'lib/celluloid/io/stream.rb', line 284

def print(*args)
  s = ""
  args.each { |arg| s << arg.to_s }
  do_write(s)
  nil
end

#printf(s, *args) ⇒ Object

Formats and writes to the stream converting parameters under control of the format string.

See Kernel#sprintf for format string details.



295
296
297
298
# File 'lib/celluloid/io/stream.rb', line 295

def printf(s, *args)
  do_write(s % args)
  nil
end

#puts(*args) ⇒ Object

Writes args to the stream along with a record separator.

See IO#puts for full details.



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/celluloid/io/stream.rb', line 264

def puts(*args)
  s = ""
  if args.empty?
    s << "\n"
  end

  args.each do |arg|
    s << arg.to_s
    if $/ && /\n\z/ !~ s
      s << "\n"
    end
  end

  do_write(s)
  nil
end

#read(size = nil, buf = nil) ⇒ Object

Reads size bytes from the stream. If buf is provided it must reference a string which will receive the data.

See IO#read for full details.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/celluloid/io/stream.rb', line 87

def read(size=nil, buf=nil)
  if size == 0
    if buf
      buf.clear
      return buf
    else
      return ""
    end
  end

  until @eof
    break if size && size <= @read_buffer.size
    fill_rbuff
    break unless size
  end

  ret = consume_rbuff(size) || ""

  if buf
    buf.replace(ret)
    ret = buf
  end

  (size && ret.empty?) ? nil : ret
end

#readcharObject

Reads a one-character string from the stream. Raises an EOFError at end of file.

Raises:

  • (EOFError)


224
225
226
227
# File 'lib/celluloid/io/stream.rb', line 224

def readchar
  raise EOFError if eof?
  getc
end

#readline(eol = $/) ⇒ Object

Reads a line from the stream which is separated by eol.

Raises EOFError if at end of file.

Raises:

  • (EOFError)


204
205
206
207
# File 'lib/celluloid/io/stream.rb', line 204

def readline(eol=$/)
  raise EOFError if eof?
  gets(eol)
end

#readlines(eol = $/) ⇒ Object

Reads lines from the stream which are separated by eol.

See also #gets



191
192
193
194
195
196
197
198
199
# File 'lib/celluloid/io/stream.rb', line 191

def readlines(eol=$/)
  ary = []

  while line = self.gets(eol)
    ary << line
  end

  ary
end

#readpartial(maxlen, buf = nil) ⇒ Object

Reads at most maxlen bytes from the stream. If buf is provided it must reference a string which will receive the data.

See IO#readpartial for full details.

Raises:

  • (EOFError)


117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/celluloid/io/stream.rb', line 117

def readpartial(maxlen, buf=nil)
  if maxlen == 0
    if buf
      buf.clear
      return buf
    else
      return ""
    end
  end

  if @read_buffer.empty?
    begin
      return sysread(maxlen, buf)
    rescue Errno::EAGAIN
      retry
    end
  end

  ret = consume_rbuff(maxlen)

  if buf
    buf.replace(ret)
    ret = buf
  end

  raise EOFError if ret.empty?
  ret
end

#sysread(length = nil, buffer = nil) ⇒ Object

System read via the nonblocking subsystem



37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/celluloid/io/stream.rb', line 37

def sysread(length = nil, buffer = nil)
  buffer ||= ''.force_encoding(Encoding::ASCII_8BIT)

  @read_latch.synchronize do
    begin
      read_nonblock(length, buffer)
    rescue ::IO::WaitReadable
      wait_readable
      retry
    end
  end

  buffer
end

#syswrite(string) ⇒ Object

System write via the nonblocking subsystem



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/celluloid/io/stream.rb', line 53

def syswrite(string)
  length = string.length
  total_written = 0

  remaining = string

  @write_latch.synchronize do
    while total_written < length
      begin
        written = write_nonblock(remaining)
      rescue ::IO::WaitWritable
        wait_writable
        retry
      rescue EOFError
        return total_written
      rescue Errno::EAGAIN
        wait_writable
        retry
      end

      total_written += written

      # FIXME: mutating the original buffer here. Seems bad.
      remaining.slice!(0, written) if written < remaining.length
    end
  end

  total_written
end

#ungetc(c) ⇒ Object

Pushes character c back onto the stream such that a subsequent buffered character read will return it.

Unlike IO#getc multiple bytes may be pushed back onto the stream.

Has no effect on unbuffered reads (such as #sysread).



235
236
237
# File 'lib/celluloid/io/stream.rb', line 235

def ungetc(c)
  @read_buffer[0,0] = c.chr
end

#wait_readableObject

Wait until the current object is readable



31
# File 'lib/celluloid/io/stream.rb', line 31

def wait_readable; Celluloid::IO.wait_readable(self); end

#wait_writableObject

Wait until the current object is writable



34
# File 'lib/celluloid/io/stream.rb', line 34

def wait_writable; Celluloid::IO.wait_writable(self); end

#write(s) ⇒ Object

Writes s to the stream. If the argument is not a string it will be converted using String#to_s. Returns the number of bytes written.



249
250
251
252
# File 'lib/celluloid/io/stream.rb', line 249

def write(s)
  do_write(s)
  s.bytesize
end