Module: DTAS::Buffer::ReadWrite
- Defined in:
- lib/dtas/buffer/read_write.rb
Overview
:nodoc:
Constant Summary collapse
- MAX_AT_ONCE =
min PIPE_BUF value in POSIX
512
Instance Attribute Summary collapse
-
#buffer_size ⇒ Object
Returns the value of attribute buffer_size.
Instance Method Summary collapse
- #_rbuf ⇒ Object
- #broadcast_inf(targets, bytes) ⇒ Object
-
#broadcast_one(targets, bytes) ⇒ Object
always block when we have a single target.
-
#discard(bytes) ⇒ Object
be sure to only call this with nil when all writers to @wr are done.
Instance Attribute Details
#buffer_size ⇒ Object
Returns the value of attribute buffer_size.
11 12 13 |
# File 'lib/dtas/buffer/read_write.rb', line 11 def buffer_size @buffer_size end |
Instance Method Details
#_rbuf ⇒ Object
13 14 15 |
# File 'lib/dtas/buffer/read_write.rb', line 13 def _rbuf Thread.current[:dtas_pbuf] ||= "" end |
#broadcast_inf(targets, bytes) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/dtas/buffer/read_write.rb', line 39 def broadcast_inf(targets, bytes) nr_nb = targets.count { |sink| sink.nonblock? } if nr_nb == 0 || nr_nb == targets.size # if all targets are full, don't start until they're all writable r = IO.select(nil, targets, nil, 0) or return targets blocked = targets - r[1] # tell DTAS::UNIXServer#run_once to wait on the blocked targets return blocked if blocked[0] # all writable, yay! else blocked = [] end again = {} # don't pin too much on one target bytes = bytes > MAX_AT_ONCE ? MAX_AT_ONCE : bytes buf = _rbuf @to_io.read(bytes, buf) @bytes_xfer += buf.bytesize targets.delete_if do |dst| begin if dst.nonblock? w = dst.write_nonblock(buf) again[dst] = buf.byteslice(w, n) if w < n else dst.write(buf) end false rescue Errno::EAGAIN blocked << dst false rescue IOError, Errno::EPIPE => e again.delete(dst) __dst_error(dst, e) true end end # try to write as much as possible again.delete_if do |dst, sbuf| begin w = dst.write_nonblock(sbuf) n = sbuf.bytesize if w < n again[dst] = sbuf.byteslice(w, n) false else true end rescue Errno::EAGAIN blocked << dst true rescue IOError, Errno::EPIPE => e __dst_error(dst, e) true end end until again.empty? targets[0] ? :wait_readable : nil end |
#broadcast_one(targets, bytes) ⇒ Object
always block when we have a single target
27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/dtas/buffer/read_write.rb', line 27 def broadcast_one(targets, bytes) buf = _rbuf @to_io.read(bytes, buf) n = targets[0].write(buf) # IO#write has write-in-full behavior @bytes_xfer += n :wait_readable rescue Errno::EPIPE, IOError => e __dst_error(targets[0], e) targets.clear nil # do not return error here, we already spewed an error message end |
#discard(bytes) ⇒ Object
be sure to only call this with nil when all writers to @wr are done
18 19 20 21 22 23 24 |
# File 'lib/dtas/buffer/read_write.rb', line 18 def discard(bytes) buf = _rbuf begin @to_io.read(bytes, buf) or break # EOF bytes -= buf.bytesize end until bytes == 0 end |