Module: DTAS::Buffer::ReadWrite

Defined in:
lib/dtas/buffer/read_write.rb

Overview

compatibility code for systems lacking “splice” support via the “sleepy_penguin” 3.5+ RubyGem. Used only by -player

Constant Summary collapse

MAX_AT_ONCE =

:nodoc:

512

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#buffer_sizeObject

min PIPE_BUF value in POSIX



13
14
15
# File 'lib/dtas/buffer/read_write.rb', line 13

def buffer_size
  @buffer_size
end

Instance Method Details

#_rbufObject



15
16
17
# File 'lib/dtas/buffer/read_write.rb', line 15

def _rbuf
  Thread.current[:dtas_pbuf] ||= ''.b
end

#broadcast_inf(targets, limit = nil) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/dtas/buffer/read_write.rb', line 45

def broadcast_inf(targets, limit = nil)
  nr_nb = targets.count(&:nonblock?)
  if nr_nb == 0 || nr_nb == targets.size
    # if all targets are full, don't start until they're all writable
    r = IO.select(nil, targets, nil, 0) or return targets
    blocked = targets - r[1]

    # tell DTAS::UNIXServer#run_once to wait on the blocked targets
    return blocked if blocked[0]

    # all writable, yay!
  else
    blocked = []
  end

  again = {}

  # don't pin too much on one target
  bytes = inflight
  limit ||= MAX_AT_ONCE
  bytes = bytes > limit ? limit : bytes
  buf = _rbuf
  @to_io.read(bytes, buf)
  n = buf.bytesize
  @bytes_xfer += n

  targets.delete_if do |dst|
    begin
      if dst.nonblock?
        case w = dst.write_nonblock(buf, exception: false)
        when :wait_writable
          blocked << dst
        else
          again[dst] = buf.byteslice(w, n) if w < n
        end
      else
        dst.write(buf)
      end
      false
    rescue IOError, Errno::EPIPE => e
      again.delete(dst)
      __dst_error(dst, e)
      true
    end
  end

  # try to write as much as possible
  again.delete_if do |dst, sbuf|
    begin
      case w = dst.write_nonblock(sbuf, exception: false)
      when :wait_writable
        blocked << dst
        true
      else
        n = sbuf.bytesize
        if w < n
          again[dst] = sbuf.byteslice(w, n)
          false
        else
          true
        end
      end
    rescue IOError, Errno::EPIPE => e
      __dst_error(dst, e)
      true
    end
  end until again.empty?
  targets[0] ? :wait_readable : nil
end

#broadcast_one(targets, limit = nil) ⇒ Object

always block when we have a single target



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/dtas/buffer/read_write.rb', line 31

def broadcast_one(targets, limit = nil)
  buf = _rbuf
  case rv = @to_io.read_nonblock(limit || MAX_AT_ONCE, buf, exception: false)
  when nil, :wait_readable then return rv
  end
  n = targets[0].write(buf) # IO#write has write-in-full behavior
  @bytes_xfer += n
  :wait_readable
rescue Errno::EPIPE, IOError => e
  __dst_error(targets[0], e)
  targets.clear
  nil # do not return error here, we already spewed an error message
end

#discard(bytes) ⇒ Object

be sure to only call this with nil when all writers to @wr are done



20
21
22
23
24
25
26
27
28
# File 'lib/dtas/buffer/read_write.rb', line 20

def discard(bytes)
  buf = _rbuf
  begin
    @to_io.readpartial(bytes, buf)
    bytes -= buf.bytesize
  rescue EOFError
    return
  end until bytes == 0
end