Class: HTTP2::Stream
- Inherits:
-
Object
- Object
- HTTP2::Stream
- Includes:
- Emitter, Error, FlowBuffer
- Defined in:
- lib/http/2/stream.rb
Overview
A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.
This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.
+--------+
PP | | PP
,--------| idle |--------.
/ | | \
v +--------+ v
+----------+ | +----------+
| | | H | |
,---|:reserved | | |:reserved |---.
| | (local) | v | (remote) | |
| +----------+ +--------+ +----------+ |
| | :active | | :active | |
| | ,-------|:active |-------. | |
| | H / ES | | ES \ H | |
| v v +--------+ v v |
| +-----------+ | +-----------+ |
| |:half_close| | |:half_close| |
| | (remote) | | | (local) | |
| +-----------+ | +-----------+ |
| | v | |
| | ES/R +--------+ ES/R | |
| `----------->| |<-----------' |
| R | :close | R |
`-------------------->| |<--------------------'
+--------+
Instance Attribute Summary collapse
-
#closed ⇒ Object
readonly
Reason why connection was closed.
-
#dependency ⇒ Object
readonly
Returns the value of attribute dependency.
-
#id ⇒ Object
readonly
Stream ID (odd for client initiated streams, even otherwise).
-
#local_window ⇒ Object
(also: #window)
readonly
Size of current stream flow control window.
-
#parent ⇒ Object
readonly
Request parent stream of push stream.
-
#remote_window ⇒ Object
readonly
Returns the value of attribute remote_window.
-
#state ⇒ Object
readonly
Stream state as defined by HTTP 2.0.
-
#weight ⇒ Object
readonly
Stream priority as set by initiator.
Instance Method Summary collapse
-
#cancel ⇒ Object
Sends a RST_STREAM indicating that the stream is no longer needed.
-
#chunk_data(payload, max_size) ⇒ Object
Chunk data into max_size, yield each chunk, then return final chunk.
-
#close(error = :stream_closed) ⇒ Object
Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.
-
#data(payload, end_stream: true) ⇒ Object
Sends DATA frame containing response payload.
-
#headers(headers, end_headers: true, end_stream: false) ⇒ Object
Sends a HEADERS frame containing HTTP response headers.
-
#initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle) ⇒ Stream
constructor
Initializes new stream.
- #promise(headers, end_headers: true, &block) ⇒ Object
-
#receive(frame) ⇒ Object
(also: #<<)
Processes incoming HTTP 2.0 frames.
-
#refuse ⇒ Object
Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.
-
#reprioritize(weight: 16, dependency: 0, exclusive: false) ⇒ Object
Sends a PRIORITY frame with new stream priority value (can only be performed by the client).
-
#send(frame) ⇒ Object
Processes outgoing HTTP 2.0 frames.
-
#window_update(increment) ⇒ Object
Sends a WINDOW_UPDATE frame to the peer.
Methods included from Emitter
Methods included from FlowBuffer
Constructor Details
#initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle) ⇒ Stream
Initializes new stream.
Note that you should never have to call this directly. To create a new client initiated stream, use Connection#new_stream. Similarly, Connection will emit new stream objects, when new stream frames are received.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/http/2/stream.rb', line 75 def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle) @connection = connection @id = id @weight = weight @dependency = dependency process_priority(weight: weight, stream_dependency: dependency, exclusive: exclusive) @local_window_max_size = connection.local_settings[:settings_initial_window_size] @local_window = connection.local_settings[:settings_initial_window_size] @remote_window = connection.remote_settings[:settings_initial_window_size] @parent = parent @state = state @error = false @closed = false @send_buffer = [] on(:window) { |v| @remote_window = v } on(:local_window) { |v| @local_window_max_size = @local_window = v } end |
Instance Attribute Details
#closed ⇒ Object (readonly)
Reason why connection was closed.
60 61 62 |
# File 'lib/http/2/stream.rb', line 60 def closed @closed end |
#dependency ⇒ Object (readonly)
Returns the value of attribute dependency.
52 53 54 |
# File 'lib/http/2/stream.rb', line 52 def dependency @dependency end |
#id ⇒ Object (readonly)
Stream ID (odd for client initiated streams, even otherwise).
42 43 44 |
# File 'lib/http/2/stream.rb', line 42 def id @id end |
#local_window ⇒ Object (readonly) Also known as: window
Size of current stream flow control window.
55 56 57 |
# File 'lib/http/2/stream.rb', line 55 def local_window @local_window end |
#parent ⇒ Object (readonly)
Request parent stream of push stream.
48 49 50 |
# File 'lib/http/2/stream.rb', line 48 def parent @parent end |
#remote_window ⇒ Object (readonly)
Returns the value of attribute remote_window.
56 57 58 |
# File 'lib/http/2/stream.rb', line 56 def remote_window @remote_window end |
#state ⇒ Object (readonly)
Stream state as defined by HTTP 2.0.
45 46 47 |
# File 'lib/http/2/stream.rb', line 45 def state @state end |
#weight ⇒ Object (readonly)
Stream priority as set by initiator.
51 52 53 |
# File 'lib/http/2/stream.rb', line 51 def weight @weight end |
Instance Method Details
#cancel ⇒ Object
Sends a RST_STREAM indicating that the stream is no longer needed.
217 218 219 |
# File 'lib/http/2/stream.rb', line 217 def cancel send(type: :rst_stream, error: :cancel) end |
#chunk_data(payload, max_size) ⇒ Object
Chunk data into max_size, yield each chunk, then return final chunk
198 199 200 201 202 203 204 205 206 |
# File 'lib/http/2/stream.rb', line 198 def chunk_data(payload, max_size) total = payload.bytesize cursor = 0 while (total - cursor) > max_size yield payload.byteslice(cursor, max_size) cursor += max_size end payload.byteslice(cursor, total - cursor) end |
#close(error = :stream_closed) ⇒ Object
Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.
212 213 214 |
# File 'lib/http/2/stream.rb', line 212 def close(error = :stream_closed) send(type: :rst_stream, error: error) end |
#data(payload, end_stream: true) ⇒ Object
Sends DATA frame containing response payload.
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/http/2/stream.rb', line 180 def data(payload, end_stream: true) # Split data according to each frame is smaller enough # TODO: consider padding? max_size = @connection.remote_settings[:settings_max_frame_size] if payload.bytesize > max_size payload = chunk_data(payload, max_size) do |chunk| send(type: :data, flags: [], payload: chunk) end end flags = [] flags << :end_stream if end_stream send(type: :data, flags: flags, payload: payload) end |
#headers(headers, end_headers: true, end_stream: false) ⇒ Object
Sends a HEADERS frame containing HTTP response headers.
151 152 153 154 155 156 157 |
# File 'lib/http/2/stream.rb', line 151 def headers(headers, end_headers: true, end_stream: false) flags = [] flags << :end_headers if end_headers flags << :end_stream if end_stream send(type: :headers, flags: flags, payload: headers) end |
#promise(headers, end_headers: true, &block) ⇒ Object
159 160 161 162 163 164 |
# File 'lib/http/2/stream.rb', line 159 def promise(headers, end_headers: true, &block) fail ArgumentError, 'must provide callback' unless block_given? flags = end_headers ? [:end_headers] : [] emit(:promise, self, headers, flags, &block) end |
#receive(frame) ⇒ Object Also known as: <<
Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/http/2/stream.rb', line 97 def receive(frame) transition(frame, false) case frame[:type] when :data update_local_window(frame) # Emit DATA frame emit(:data, frame[:payload]) unless frame[:ignore] calculate_window_update(@local_window_max_size) when :headers emit(:headers, frame[:payload]) unless frame[:ignore] when :push_promise emit(:promise_headers, frame[:payload]) unless frame[:ignore] when :priority process_priority(frame) when :window_update process_window_update(frame) when :altsvc, :blocked emit(frame[:type], frame) end complete_transition(frame) end |
#refuse ⇒ Object
Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.
223 224 225 |
# File 'lib/http/2/stream.rb', line 223 def refuse send(type: :rst_stream, error: :refused_stream) end |
#reprioritize(weight: 16, dependency: 0, exclusive: false) ⇒ Object
Sends a PRIORITY frame with new stream priority value (can only be performed by the client).
171 172 173 174 |
# File 'lib/http/2/stream.rb', line 171 def reprioritize(weight: 16, dependency: 0, exclusive: false) stream_error if @id.even? send(type: :priority, weight: weight, stream_dependency: dependency, exclusive: exclusive) end |
#send(frame) ⇒ Object
Processes outgoing HTTP 2.0 frames. Data frames may be automatically split and buffered based on maximum frame size and current stream flow control window size.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/http/2/stream.rb', line 127 def send(frame) process_priority(frame) if frame[:type] == :priority case frame[:type] when :data # @remote_window is maintained in send_data send_data(frame) when :window_update manage_state(frame) do @local_window += frame[:increment] emit(:frame, frame) end else manage_state(frame) do emit(:frame, frame) end end end |
#window_update(increment) ⇒ Object
Sends a WINDOW_UPDATE frame to the peer.
230 231 232 233 234 |
# File 'lib/http/2/stream.rb', line 230 def window_update(increment) # emit stream-level WINDOW_UPDATE unless stream is closed return if @state == :closed || @state == :remote_closed send(type: :window_update, increment: increment) end |