Module: DTAS::Buffer::Splice

Defined in:
lib/dtas/buffer/splice.rb

Overview

Used by -player on Linux systems with the “sleepy_penguin” RubyGem installed

Constant Summary collapse

MAX_AT_ONCE =

:nodoc:

4096
MAX_AT_ONCE_1 =

page size in Linux

65536
F_MOVE =
SleepyPenguin::F_MOVE
F_NONBLOCK =
SleepyPenguin::F_NONBLOCK

Instance Method Summary collapse

Instance Method Details

#__broadcast_tee(blocked, targets, chunk_size) ⇒ Object

returns the largest value we teed



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/dtas/buffer/splice.rb', line 69

def __broadcast_tee(blocked, targets, chunk_size)
  most_teed = 0
  targets.delete_if do |dst|
    begin
      t = (dst.nonblock? || most_teed == 0) ?
          SleepyPenguin.tee(@to_io, dst, chunk_size, F_NONBLOCK,
                            exception: false) :
          __tee_in_full(@to_io, dst, chunk_size)
      if Integer === t
        if t > most_teed
          chunk_size = t if most_teed == 0
          most_teed = t
        end
      else
        blocked << dst
      end
      false
    rescue IOError, Errno::EPIPE => e
      __dst_error(dst, e)
      true
    end
  end
  most_teed
end

#__splice_in_full(src, dst, bytes, flags) ⇒ Object



58
59
60
61
62
63
64
65
66
# File 'lib/dtas/buffer/splice.rb', line 58

def __splice_in_full(src, dst, bytes, flags)
  rv = 0
  while bytes > 0
    s = SleepyPenguin.splice(src, dst, bytes, flags)
    rv += s
    bytes -= s
  end
  rv
end

#__tee_in_full(src, dst, bytes) ⇒ Object



48
49
50
51
52
53
54
55
56
# File 'lib/dtas/buffer/splice.rb', line 48

def __tee_in_full(src, dst, bytes)
  rv = 0
  while bytes > 0
    s = SleepyPenguin.tee(src, dst, bytes)
    bytes -= s
    rv += s
  end
  rv
end

#broadcast_inf(targets, limit = nil) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/dtas/buffer/splice.rb', line 94

def broadcast_inf(targets, limit = nil)
  if targets.all?(&:ready_write_optimized?)
    blocked = []
  elsif targets.none?(&:nonblock?)
    # if all targets are blocking, don't start until they're all writable
    r = IO.select(nil, targets, nil, 0) or return targets
    blocked = targets - r[1]

    # tell DTAS::UNIXServer#run_once to wait on the blocked targets
    return blocked if blocked[0]

    # all writable, yay!
  else
    blocked = []
  end

  # don't pin too much on one target
  bytes = limit || MAX_AT_ONCE
  last = targets.pop # we splice to the last one, tee to the rest

  # this may return zero if all targets were non-blocking
  most_teed = __broadcast_tee(blocked, targets, bytes)

  # don't splice more than the largest amount we successfully teed
  bytes = most_teed if most_teed > 0

  begin
    targets << last
    if last.nonblock? || most_teed == 0
      s = SleepyPenguin.splice(@to_io, last, bytes, F_MOVE|F_NONBLOCK,
                               exception: false)
      if Symbol === s
        blocked << last

        # we accomplished nothing!
        # If _all_ writers are blocked, do not discard data,
        # stay blocked on :wait_writable
        return blocked if most_teed == 0

        # the tees targets win, drop data intended for last
        if most_teed > 0
          discard(most_teed)
          @bytes_xfer += most_teed
          # do not watch for writability of last, last is non-blocking
          return :wait_readable
        end
      end
    else
      # the blocking case is simple
      s = __splice_in_full(@to_io, last, bytes, F_MOVE)
    end
    @bytes_xfer += s

    # if we can't splice everything
    # discard it so the early targets do not get repeated data
    if s < bytes && most_teed > 0
      discard(bytes - s)
    end
    :wait_readable
  rescue IOError, Errno::EPIPE => e # last failed, drop it
    __dst_error(last, e)
    targets.pop # we're no longer a valid target

    if most_teed == 0
      # nothing accomplished, watch any targets
      return blocked if blocked[0]
    else
      # some progress, discard the data we could not splice
      @bytes_xfer += most_teed
      discard(most_teed)
    end

    # stop decoding if we're completely errored out
    # returning nil will trigger close
    return targets[0] ? :wait_readable : nil
  end
end

#broadcast_one(targets, limit = nil) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/dtas/buffer/splice.rb', line 31

def broadcast_one(targets, limit = nil)
  # single output is always non-blocking
  limit ||= MAX_AT_ONCE_1
  s = SleepyPenguin.splice(@to_io, targets[0], limit, F_MOVE|F_NONBLOCK,
                           exception: false)
  if Symbol === s
    targets # our one and only target blocked on write
  else
    @bytes_xfer += s
    :wait_readable # we want to read more from @to_io soon
  end
rescue Errno::EPIPE, IOError => e
  __dst_error(targets[0], e)
  targets.clear
  nil # do not return error here, we already spewed an error message
end

#buffer_sizeObject



16
17
18
# File 'lib/dtas/buffer/splice.rb', line 16

def buffer_size
  @to_io.pipe_size
end

#buffer_size=(bytes) ⇒ Object

nil is OK, won’t reset existing pipe, either…



21
22
23
24
# File 'lib/dtas/buffer/splice.rb', line 21

def buffer_size=(bytes)
  @to_io.pipe_size = bytes if bytes
  @buffer_size = bytes
end

#discard(bytes) ⇒ Object

be sure to only call this with nil when all writers to @wr are done



27
28
29
# File 'lib/dtas/buffer/splice.rb', line 27

def discard(bytes)
  SleepyPenguin.splice(@to_io, DTAS.null, bytes)
end