Class: Async::HTTP::Protocol::HTTP2::Stream::Buffer

Inherits:
Object
  • Object
show all
Defined in:
lib/async/http/protocol/http2/stream.rb

Instance Method Summary collapse

Constructor Details

#initialize(stream, body, task: Task.current) ⇒ Buffer

Returns a new instance of Buffer.



29
30
31
32
33
34
35
36
37
38
# File 'lib/async/http/protocol/http2/stream.rb', line 29

def initialize(stream, body, task: Task.current)
  @stream = stream
  
  @body = body
  @remainder = nil
  
  @window_updated = Async::Condition.new
  
  @task = task.async(&self.method(:passthrough))
end

Instance Method Details

#close(error) ⇒ Object



100
101
102
103
104
105
106
107
# File 'lib/async/http/protocol/http2/stream.rb', line 100

def close(error)
  if @body
    @body.close(error)
    @body = nil
  end
  
  @task&.stop
end

#end_streamObject



92
93
94
# File 'lib/async/http/protocol/http2/stream.rb', line 92

def end_stream
  @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM)
end

#passthrough(task) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/async/http/protocol/http2/stream.rb', line 40

def passthrough(task)
  while chunk = self.read
    maximum_size = @stream.available_frame_size
    
    while maximum_size <= 0
      @window_updated.wait
      
      maximum_size = @stream.available_frame_size
    end
    
    self.send_data(chunk, maximum_size)
  end
  
  self.end_stream
rescue Async::Stop
  # Ignore.
ensure
  @body&.close($!)
  @body = nil
end

#push(chunk) ⇒ Object



72
73
74
# File 'lib/async/http/protocol/http2/stream.rb', line 72

def push(chunk)
  @remainder = chunk
end

#readObject



61
62
63
64
65
66
67
68
69
70
# File 'lib/async/http/protocol/http2/stream.rb', line 61

def read
  if @remainder
    remainder = @remainder
    @remainder = nil
    
    return remainder
  else
    @body.read
  end
end

#send_data(chunk, maximum_size) ⇒ Object

Send ‘maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk.

Parameters:

  • maximum_size (Integer)

    send up to this many bytes of data.

  • stream (Stream)

    the stream to use for sending data frames.



79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/async/http/protocol/http2/stream.rb', line 79

def send_data(chunk, maximum_size)
  if chunk.bytesize <= maximum_size
    @stream.send_data(chunk, maximum_size: maximum_size)
  else
    @stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size)
    
    # The window was not big enough to send all the data, so we save it for next time:
    self.push(
      chunk.byteslice(maximum_size, chunk.bytesize - maximum_size)
    )
  end
end

#window_updated(size) ⇒ Object



96
97
98
# File 'lib/async/http/protocol/http2/stream.rb', line 96

def window_updated(size)
  @window_updated.signal
end