Module: DTAS::Buffer::ReadWrite
- Defined in:
- lib/dtas/buffer/read_write.rb
Overview
compatibility code for systems lacking “splice” support via the “io-splice” RubyGem. Used only by -player
Constant Summary collapse
- MAX_AT_ONCE =
:nodoc:
512
Instance Attribute Summary collapse
-
#buffer_size ⇒ Object
min PIPE_BUF value in POSIX.
Instance Method Summary collapse
- #_rbuf ⇒ Object
- #broadcast_inf(targets, limit = nil) ⇒ Object
-
#broadcast_one(targets, limit = nil) ⇒ Object
always block when we have a single target.
-
#discard(bytes) ⇒ Object
be sure to only call this with nil when all writers to @wr are done.
Instance Attribute Details
#buffer_size ⇒ Object
min PIPE_BUF value in POSIX
13 14 15 |
# File 'lib/dtas/buffer/read_write.rb', line 13 def buffer_size @buffer_size end |
Instance Method Details
#_rbuf ⇒ Object
15 16 17 |
# File 'lib/dtas/buffer/read_write.rb', line 15 def _rbuf Thread.current[:dtas_pbuf] ||= ''.b end |
#broadcast_inf(targets, limit = nil) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/dtas/buffer/read_write.rb', line 45 def broadcast_inf(targets, limit = nil) nr_nb = targets.count(&:nonblock?) if nr_nb == 0 || nr_nb == targets.size # if all targets are full, don't start until they're all writable r = IO.select(nil, targets, nil, 0) or return targets blocked = targets - r[1] # tell DTAS::UNIXServer#run_once to wait on the blocked targets return blocked if blocked[0] # all writable, yay! else blocked = [] end again = {} # don't pin too much on one target bytes = inflight limit ||= MAX_AT_ONCE bytes = bytes > limit ? limit : bytes buf = _rbuf @to_io.read(bytes, buf) n = buf.bytesize @bytes_xfer += n targets.delete_if do |dst| begin if dst.nonblock? case w = dst.write_nonblock(buf, exception: false) when :wait_writable blocked << dst else again[dst] = buf.byteslice(w, n) if w < n end else dst.write(buf) end false rescue IOError, Errno::EPIPE => e again.delete(dst) __dst_error(dst, e) true end end # try to write as much as possible again.delete_if do |dst, sbuf| begin case w = dst.write_nonblock(sbuf, exception: false) when :wait_writable blocked << dst true else n = sbuf.bytesize if w < n again[dst] = sbuf.byteslice(w, n) false else true end end rescue IOError, Errno::EPIPE => e __dst_error(dst, e) true end end until again.empty? targets[0] ? :wait_readable : nil end |
#broadcast_one(targets, limit = nil) ⇒ Object
always block when we have a single target
31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/dtas/buffer/read_write.rb', line 31 def broadcast_one(targets, limit = nil) buf = _rbuf case rv = @to_io.read_nonblock(limit || MAX_AT_ONCE, buf, exception: false) when nil, :wait_readable then return rv end n = targets[0].write(buf) # IO#write has write-in-full behavior @bytes_xfer += n :wait_readable rescue Errno::EPIPE, IOError => e __dst_error(targets[0], e) targets.clear nil # do not return error here, we already spewed an error message end |
#discard(bytes) ⇒ Object
be sure to only call this with nil when all writers to @wr are done
20 21 22 23 24 25 26 27 28 |
# File 'lib/dtas/buffer/read_write.rb', line 20 def discard(bytes) buf = _rbuf begin @to_io.readpartial(bytes, buf) bytes -= buf.bytesize rescue EOFError return end until bytes == 0 end |