Class: Async::HTTP::Protocol::HTTP2::Stream::Buffer
- Inherits:
-
Object
- Object
- Async::HTTP::Protocol::HTTP2::Stream::Buffer
- Defined in:
- lib/async/http/protocol/http2/stream.rb
Instance Method Summary collapse
- #close(error) ⇒ Object
- #end_stream ⇒ Object
-
#initialize(stream, body, task: Task.current) ⇒ Buffer
constructor
A new instance of Buffer.
- #passthrough(task) ⇒ Object
- #push(chunk) ⇒ Object
- #read ⇒ Object
-
#send_data(chunk, maximum_size) ⇒ Object
Send ‘maximum_size` bytes of data using the specified `stream`.
- #window_updated(size) ⇒ Object
Constructor Details
#initialize(stream, body, task: Task.current) ⇒ Buffer
Returns a new instance of Buffer.
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/async/http/protocol/http2/stream.rb', line 29 def initialize(stream, body, task: Task.current) @stream = stream @body = body @remainder = nil @window_updated = Async::Condition.new @task = task.async(&self.method(:passthrough)) end |
Instance Method Details
#close(error) ⇒ Object
100 101 102 103 104 105 106 107 |
# File 'lib/async/http/protocol/http2/stream.rb', line 100 def close(error) if @body @body.close(error) @body = nil end @task&.stop end |
#end_stream ⇒ Object
92 93 94 |
# File 'lib/async/http/protocol/http2/stream.rb', line 92 def end_stream @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM) end |
#passthrough(task) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/async/http/protocol/http2/stream.rb', line 40 def passthrough(task) while chunk = self.read maximum_size = @stream.available_frame_size while maximum_size <= 0 @window_updated.wait maximum_size = @stream.available_frame_size end self.send_data(chunk, maximum_size) end self.end_stream rescue Async::Stop # Ignore. ensure @body&.close($!) @body = nil end |
#push(chunk) ⇒ Object
72 73 74 |
# File 'lib/async/http/protocol/http2/stream.rb', line 72 def push(chunk) @remainder = chunk end |
#read ⇒ Object
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/async/http/protocol/http2/stream.rb', line 61 def read if @remainder remainder = @remainder @remainder = nil return remainder else @body&.read end end |
#send_data(chunk, maximum_size) ⇒ Object
Send ‘maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk.
79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/async/http/protocol/http2/stream.rb', line 79 def send_data(chunk, maximum_size) if chunk.bytesize <= maximum_size @stream.send_data(chunk, maximum_size: maximum_size) else @stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size) # The window was not big enough to send all the data, so we save it for next time: self.push( chunk.byteslice(maximum_size, chunk.bytesize - maximum_size) ) end end |
#window_updated(size) ⇒ Object
96 97 98 |
# File 'lib/async/http/protocol/http2/stream.rb', line 96 def window_updated(size) @window_updated.signal end |