Class: Async::HTTP::Protocol::HTTP2::Stream::Output
- Inherits:
-
Object
- Object
- Async::HTTP::Protocol::HTTP2::Stream::Output
- Defined in:
- lib/async/http/protocol/http2/stream.rb
Instance Method Summary collapse
- #close(error) ⇒ Object
- #end_stream ⇒ Object
-
#initialize(stream, body, task: Task.current) ⇒ Output
constructor
A new instance of Output.
-
#passthrough(task) ⇒ Object
Reads chunks from the given body and writes them to the stream as fast as possible.
- #push(chunk) ⇒ Object
- #read ⇒ Object
-
#send_data(chunk, maximum_size) ⇒ Object
Send ‘maximum_size` bytes of data using the specified `stream`.
- #window_updated(size) ⇒ Object
Constructor Details
#initialize(stream, body, task: Task.current) ⇒ Output
Returns a new instance of Output.
47 48 49 50 51 52 53 54 55 56 |
# File 'lib/async/http/protocol/http2/stream.rb', line 47 def initialize(stream, body, task: Task.current) @stream = stream @body = body @remainder = nil @window_updated = Async::Condition.new @task = task.async(&self.method(:passthrough)) end |
Instance Method Details
#close(error) ⇒ Object
119 120 121 122 123 124 125 126 |
# File 'lib/async/http/protocol/http2/stream.rb', line 119 def close(error) if @body @body.close(error) @body = nil end @task&.stop end |
#end_stream ⇒ Object
111 112 113 |
# File 'lib/async/http/protocol/http2/stream.rb', line 111 def end_stream @stream.send_data(nil, ::Protocol::HTTP2::END_STREAM) end |
#passthrough(task) ⇒ Object
Reads chunks from the given body and writes them to the stream as fast as possible.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/async/http/protocol/http2/stream.rb', line 59 def passthrough(task) while chunk = self.read maximum_size = @stream.available_frame_size while maximum_size <= 0 @window_updated.wait maximum_size = @stream.available_frame_size end self.send_data(chunk, maximum_size) end self.end_stream rescue Async::Stop # Ignore. ensure @body&.close($!) @body = nil end |
#push(chunk) ⇒ Object
91 92 93 |
# File 'lib/async/http/protocol/http2/stream.rb', line 91 def push(chunk) @remainder = chunk end |
#read ⇒ Object
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/async/http/protocol/http2/stream.rb', line 80 def read if @remainder remainder = @remainder @remainder = nil return remainder else @body&.read end end |
#send_data(chunk, maximum_size) ⇒ Object
Send ‘maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk.
98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/async/http/protocol/http2/stream.rb', line 98 def send_data(chunk, maximum_size) if chunk.bytesize <= maximum_size @stream.send_data(chunk, maximum_size: maximum_size) else @stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size) # The window was not big enough to send all the data, so we save it for next time: self.push( chunk.byteslice(maximum_size, chunk.bytesize - maximum_size) ) end end |
#window_updated(size) ⇒ Object
115 116 117 |
# File 'lib/async/http/protocol/http2/stream.rb', line 115 def window_updated(size) @window_updated.signal end |