Class: Protocol::HTTP2::Stream
- Inherits:
-
Object
- Object
- Protocol::HTTP2::Stream
- Includes:
- FlowControlled
- Defined in:
- lib/protocol/http2/stream.rb
Overview
A single HTTP/2 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.
This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.
“‘
┌────────┐
send PP │ │ recv PP
┌──────────┤ idle ├──────────┐
│ │ │ │
▼ └───┬────┘ ▼
┌──────────┐ │ ┌──────────┐
│ │ │ send H / │ │
┌──────┼ reserved │ │ recv H │ reserved ├──────┐│ │ (local) │ │ │ (remote) │ ││ └───┬──────┘ ▼ └──────┬───┘ ││ │ ┌────────┐ │ ││ │ recv ES │ │ send ES │ ││ send H │ ┌─────────┤ open ├─────────┐ │ recv H ││ │ │ │ │ │ │ ││ ▼ ▼ └───┬────┘ ▼ ▼ ││ ┌──────────┐ │ ┌──────────┐ ││ │ half │ │ │ half │ ││ │ closed │ │ send R / │ closed │ ││ │ (remote) │ │ recv R │ (local) │ ││ └────┬─────┘ │ └─────┬────┘ ││ │ │ │ ││ │ send ES / │ recv ES / │ ││ │ send R / ▼ send R / │ ││ │ recv R ┌────────┐ recv R │ ││ send R / └───────────►│ │◄───────────┘ send R / ││ recv R │ closed │ recv R │└───────────────────────►│ │◄───────────────────────┘
└────────┘
“‘
-
‘send`: endpoint sends this frame
-
‘recv`: endpoint receives this frame
-
H: HEADERS frame (with implied CONTINUATIONs)
-
PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
-
ES: END_STREAM flag
-
R: RST_STREAM frame
State transition methods use a trailing “!”.
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
The connection this stream belongs to.
-
#id ⇒ Object
readonly
Stream ID (odd for client initiated streams, even otherwise).
-
#local_window ⇒ Object
readonly
Returns the value of attribute local_window.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#remote_window ⇒ Object
readonly
Returns the value of attribute remote_window.
-
#state ⇒ Object
Stream state, e.g.
- #the priority of the stream.(priorityofthestream.) ⇒ Object readonly
Class Method Summary collapse
-
.create(connection, id) ⇒ Object
Create a new stream and add it to the connection.
Instance Method Summary collapse
-
#accept_push_promise_stream(stream_id, headers) ⇒ Object
Override this function to implement your own push promise logic.
-
#active? ⇒ Boolean
Check if the stream is active (not idle or closed).
-
#close(error = nil) ⇒ Object
Transition directly to closed state.
-
#close!(error_code = nil) ⇒ Object
Transition the stream into the closed state.
-
#closed(error = nil) ⇒ Object
The stream has been closed.
-
#closed? ⇒ Boolean
Check if the stream is closed.
-
#consume_remote_window(frame) ⇒ Object
Consume from the remote window for both stream and connection.
-
#create_push_promise_stream(headers) ⇒ Object
Override this function to implement your own push promise logic.
-
#ignore_data(frame) ⇒ Object
Ignore data frame when in an invalid state.
-
#initialize(connection, id, state = :idle) ⇒ Stream
constructor
Initialize a new stream.
-
#inspect ⇒ Object
Get a string representation of the stream.
-
#maximum_frame_size ⇒ Object
Get the maximum frame size for this stream.
-
#open! ⇒ Object
Open the stream by transitioning from idle to open state.
-
#process_data(frame) ⇒ String
The data that was received.
-
#process_headers(frame) ⇒ Object
Process headers frame and decode the header block.
-
#receive_data(frame) ⇒ Object
DATA frames are subject to flow control and can only be sent when a stream is in the “open” or “half-closed (remote)” state.
-
#receive_headers(frame) ⇒ Object
Receive and process a headers frame on this stream.
-
#receive_push_promise(frame) ⇒ Object
Receive and process a PUSH_PROMISE frame on this stream.
-
#receive_reset_stream(frame) ⇒ Object
Receive and process a RST_STREAM frame on this stream.
-
#reserved_local! ⇒ Object
Transition stream to reserved local state.
-
#reserved_remote! ⇒ Object
Transition stream to reserved remote state.
-
#send_data(*arguments, **options) ⇒ Object
Send data over this stream.
-
#send_headers(*arguments) ⇒ Object
The HEADERS frame is used to open a stream, and additionally carries a header block fragment.
-
#send_headers? ⇒ Boolean
HEADERS frames can be sent on a stream in the “idle”, “reserved (local)”, “open”, or “half-closed (remote)” state.
-
#send_push_promise(headers) ⇒ Object
Server push is semantically equivalent to a server responding to a request; however, in this case, that request is also sent by the server, as a PUSH_PROMISE frame.
-
#send_reset_stream(error_code = 0) ⇒ Object
Send a RST_STREAM frame to reset this stream.
-
#write_frame(frame) ⇒ Object
Write a frame to the connection for this stream.
Methods included from FlowControlled
#available_frame_size, #available_size, #consume_local_window, #receive_window_update, #request_window_update, #send_window_update, #update_local_window, #window_updated
Constructor Details
#initialize(connection, id, state = :idle) ⇒ Stream
Initialize a new stream.
79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/protocol/http2/stream.rb', line 79 def initialize(connection, id, state = :idle) @connection = connection @id = id @state = state @local_window = Window.new(@connection.local_settings.initial_window_size) @remote_window = Window.new(@connection.remote_settings.initial_window_size) @priority = nil end |
Instance Attribute Details
#connection ⇒ Object (readonly)
The connection this stream belongs to.
92 93 94 |
# File 'lib/protocol/http2/stream.rb', line 92 def connection @connection end |
#id ⇒ Object (readonly)
Stream ID (odd for client initiated streams, even otherwise).
95 96 97 |
# File 'lib/protocol/http2/stream.rb', line 95 def id @id end |
#local_window ⇒ Object (readonly)
Returns the value of attribute local_window.
100 101 102 |
# File 'lib/protocol/http2/stream.rb', line 100 def local_window @local_window end |
#priority ⇒ Object
Returns the value of attribute priority.
104 105 106 |
# File 'lib/protocol/http2/stream.rb', line 104 def priority @priority end |
#remote_window ⇒ Object (readonly)
Returns the value of attribute remote_window.
101 102 103 |
# File 'lib/protocol/http2/stream.rb', line 101 def remote_window @remote_window end |
#state ⇒ Object
Stream state, e.g. ‘idle`, `closed`.
98 99 100 |
# File 'lib/protocol/http2/stream.rb', line 98 def state @state end |
#the priority of the stream.(priorityofthestream.) ⇒ Object (readonly)
104 |
# File 'lib/protocol/http2/stream.rb', line 104 attr_accessor :priority |
Class Method Details
.create(connection, id) ⇒ Object
Create a new stream and add it to the connection.
67 68 69 70 71 72 73 |
# File 'lib/protocol/http2/stream.rb', line 67 def self.create(connection, id) stream = self.new(connection, id) connection.streams[id] = stream return stream end |
Instance Method Details
#accept_push_promise_stream(stream_id, headers) ⇒ Object
Override this function to implement your own push promise logic.
444 445 446 |
# File 'lib/protocol/http2/stream.rb', line 444 def accept_push_promise_stream(stream_id, headers) @connection.accept_push_promise_stream(stream_id) end |
#active? ⇒ Boolean
Check if the stream is active (not idle or closed).
120 121 122 |
# File 'lib/protocol/http2/stream.rb', line 120 def active? @state != :closed && @state != :idle end |
#close(error = nil) ⇒ Object
Transition directly to closed state. Do not pass go, do not collect $200. This method should only be used by ‘Connection#close`.
132 133 134 135 136 137 |
# File 'lib/protocol/http2/stream.rb', line 132 def close(error = nil) unless closed? @state = :closed self.closed(error) end end |
#close!(error_code = nil) ⇒ Object
Transition the stream into the closed state.
248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/protocol/http2/stream.rb', line 248 def close!(error_code = nil) @state = :closed @connection.delete(@id) if error_code error = StreamError.new("Stream closed!", error_code) end self.closed(error) return self end |
#closed(error = nil) ⇒ Object
The stream has been closed. If closed due to a stream reset, the error will be set.
243 244 |
# File 'lib/protocol/http2/stream.rb', line 243 def closed(error = nil) end |
#closed? ⇒ Boolean
Check if the stream is closed.
126 127 128 |
# File 'lib/protocol/http2/stream.rb', line 126 def closed? @state == :closed end |
#consume_remote_window(frame) ⇒ Object
Consume from the remote window for both stream and connection.
191 192 193 194 195 |
# File 'lib/protocol/http2/stream.rb', line 191 def consume_remote_window(frame) super @connection.consume_remote_window(frame) end |
#create_push_promise_stream(headers) ⇒ Object
Override this function to implement your own push promise logic.
422 423 424 |
# File 'lib/protocol/http2/stream.rb', line 422 def create_push_promise_stream(headers) @connection.create_push_promise_stream end |
#ignore_data(frame) ⇒ Object
Ignore data frame when in an invalid state.
331 332 333 |
# File 'lib/protocol/http2/stream.rb', line 331 def ignore_data(frame) # Console.warn(self) {"Received headers in state: #{@state}!"} end |
#inspect ⇒ Object
Get a string representation of the stream.
462 463 464 |
# File 'lib/protocol/http2/stream.rb', line 462 def inspect "\#<#{self.class} id=#{@id} state=#{@state}>" end |
#maximum_frame_size ⇒ Object
Get the maximum frame size for this stream.
108 109 110 |
# File 'lib/protocol/http2/stream.rb', line 108 def maximum_frame_size @connection.available_frame_size end |
#open! ⇒ Object
Open the stream by transitioning from idle to open state.
232 233 234 235 236 237 238 239 240 |
# File 'lib/protocol/http2/stream.rb', line 232 def open! if @state == :idle @state = :open else raise ProtocolError, "Cannot open stream in state: #{@state}" end return self end |
#process_data(frame) ⇒ String
Returns the data that was received.
325 326 327 |
# File 'lib/protocol/http2/stream.rb', line 325 def process_data(frame) frame.unpack end |
#process_headers(frame) ⇒ Object
Process headers frame and decode the header block.
279 280 281 282 283 284 |
# File 'lib/protocol/http2/stream.rb', line 279 def process_headers(frame) # Receiving request headers: data = frame.unpack @connection.decode_headers(data) end |
#receive_data(frame) ⇒ Object
DATA frames are subject to flow control and can only be sent when a stream is in the “open” or “half-closed (remote)” state. The entire DATA frame payload is included in flow control, including the Pad Length and Padding fields if present. If a DATA frame is received whose stream is not in “open” or “half-closed (local)” state, the recipient MUST respond with a stream error of type STREAM_CLOSED.
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 |
# File 'lib/protocol/http2/stream.rb', line 336 def receive_data(frame) if @state == :open update_local_window(frame) if frame.end_stream? @state = :half_closed_remote end process_data(frame) elsif @state == :half_closed_local update_local_window(frame) process_data(frame) if frame.end_stream? close! end elsif self.closed? ignore_data(frame) else # If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state, the recipient MUST respond with a stream error (Section 5.4.2) of type STREAM_CLOSED. self.send_reset_stream(Error::STREAM_CLOSED) end end |
#receive_headers(frame) ⇒ Object
Receive and process a headers frame on this stream.
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/protocol/http2/stream.rb', line 292 def receive_headers(frame) if @state == :idle if frame.end_stream? @state = :half_closed_remote else open! end process_headers(frame) elsif @state == :reserved_remote process_headers(frame) @state = :half_closed_local elsif @state == :open process_headers(frame) if frame.end_stream? @state = :half_closed_remote end elsif @state == :half_closed_local process_headers(frame) if frame.end_stream? close! end elsif self.closed? ignore_headers(frame) else self.send_reset_stream(Error::STREAM_CLOSED) end end |
#receive_push_promise(frame) ⇒ Object
Receive and process a PUSH_PROMISE frame on this stream.
450 451 452 453 454 455 456 457 458 |
# File 'lib/protocol/http2/stream.rb', line 450 def receive_push_promise(frame) promised_stream_id, data = frame.unpack headers = @connection.decode_headers(data) stream = self.accept_push_promise_stream(promised_stream_id, headers) stream.reserved_remote! return stream, headers end |
#receive_reset_stream(frame) ⇒ Object
Receive and process a RST_STREAM frame on this stream.
365 366 367 368 369 370 371 372 373 374 375 376 |
# File 'lib/protocol/http2/stream.rb', line 365 def receive_reset_stream(frame) if @state == :idle # If a RST_STREAM frame identifying an idle stream is received, the recipient MUST treat this as a connection error (Section 5.4.1) of type PROTOCOL_ERROR. raise ProtocolError, "Cannot receive reset stream in state: #{@state}!" else error_code = frame.unpack close!(error_code) return error_code end end |
#reserved_local! ⇒ Object
Transition stream to reserved local state.
398 399 400 401 402 403 404 405 406 |
# File 'lib/protocol/http2/stream.rb', line 398 def reserved_local! if @state == :idle @state = :reserved_local else raise ProtocolError, "Cannot reserve stream in state: #{@state}" end return self end |
#reserved_remote! ⇒ Object
Transition stream to reserved remote state.
411 412 413 414 415 416 417 418 419 |
# File 'lib/protocol/http2/stream.rb', line 411 def reserved_remote! if @state == :idle @state = :reserved_remote else raise ProtocolError, "Cannot reserve stream in state: #{@state}" end return self end |
#send_data(*arguments, **options) ⇒ Object
Send data over this stream.
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/protocol/http2/stream.rb', line 211 def send_data(*arguments, **) if @state == :open frame = write_data(*arguments, **) if frame.end_stream? @state = :half_closed_local end elsif @state == :half_closed_remote frame = write_data(*arguments, **) if frame.end_stream? close! end else raise ProtocolError, "Cannot send data in state: #{@state}" end end |
#send_headers(*arguments) ⇒ Object
The HEADERS frame is used to open a stream, and additionally carries a header block fragment. HEADERS frames can be sent on a stream in the “idle”, “reserved (local)”, “open”, or “half-closed (remote)” state.
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/protocol/http2/stream.rb', line 159 def send_headers(*arguments) if @state == :idle frame = write_headers(*arguments) if frame.end_stream? @state = :half_closed_local else open! end elsif @state == :reserved_local frame = write_headers(*arguments) @state = :half_closed_remote elsif @state == :open frame = write_headers(*arguments) if frame.end_stream? @state = :half_closed_local end elsif @state == :half_closed_remote frame = write_headers(*arguments) if frame.end_stream? close! end else raise ProtocolError, "Cannot send headers in state: #{@state}" end end |
#send_headers? ⇒ Boolean
HEADERS frames can be sent on a stream in the “idle”, “reserved (local)”, “open”, or “half-closed (remote)” state. Despite it’s name, it can also be used for trailers.
140 141 142 |
# File 'lib/protocol/http2/stream.rb', line 140 def send_headers? @state == :idle or @state == :reserved_local or @state == :open or @state == :half_closed_remote end |
#send_push_promise(headers) ⇒ Object
Server push is semantically equivalent to a server responding to a request; however, in this case, that request is also sent by the server, as a PUSH_PROMISE frame.
428 429 430 431 432 433 434 435 436 437 438 439 440 441 |
# File 'lib/protocol/http2/stream.rb', line 428 def send_push_promise(headers) if @state == :open or @state == :half_closed_remote promised_stream = self.create_push_promise_stream(headers) promised_stream.reserved_local! # The headers are the same as if the client had sent a request: write_push_promise(promised_stream.id, headers) # The server should call send_headers on the promised stream to begin sending the response: return promised_stream else raise ProtocolError, "Cannot send push promise in state: #{@state}" end end |
#send_reset_stream(error_code = 0) ⇒ Object
Send a RST_STREAM frame to reset this stream.
263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/protocol/http2/stream.rb', line 263 def send_reset_stream(error_code = 0) if @state != :idle and @state != :closed frame = ResetStreamFrame.new(@id) frame.pack(error_code) write_frame(frame) close! else raise ProtocolError, "Cannot send reset stream (#{error_code}) in state: #{@state}" end end |
#write_frame(frame) ⇒ Object
Write a frame to the connection for this stream.
114 115 116 |
# File 'lib/protocol/http2/stream.rb', line 114 def write_frame(frame) @connection.write_frame(frame) end |