Class: Protocol::HTTP2::Stream
- Inherits:
-
Object
- Object
- Protocol::HTTP2::Stream
- Includes:
- FlowControlled
- Defined in:
- lib/protocol/http2/stream.rb
Overview
A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.
This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.
+--------+
send PP | | recv PP
,--------| idle |--------.
/ | | \
v +--------+ v
+----------+ | +----------+
| | | send H / | |
,——| reserved | | recv H | reserved |——. | | (local) | | | (remote) | | | ----------
v ----------
| | | --------
| | | | recv ES | | send ES | | | send H | ,——-| open |——-. | recv H | | | / | | \ | | | v v --------
v v | | ----------
| ----------
| | | half | | | half | | | | closed | | send R / | closed | | | | (remote) | | recv R | (local) | | | ----------
| ----------
| | | | | | | | send ES / | recv ES / | | | | send R / v send R / | | | | recv R --------
recv R | | | send R / ‘———–>| |<———–’ send R / | | recv R | closed | recv R | ‘———————–>| |<———————-’
+--------+
send: endpoint sends this frame
recv: endpoint receives this frame
H: HEADERS frame (with implied CONTINUATIONs)
PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
ES: END_STREAM flag
R: RST_STREAM frame
State transition methods use a trailing “!”.
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
The connection this stream belongs to.
-
#dependency ⇒ Object
readonly
Returns the value of attribute dependency.
-
#id ⇒ Object
readonly
Stream ID (odd for client initiated streams, even otherwise).
-
#local_window ⇒ Object
readonly
Returns the value of attribute local_window.
-
#remote_window ⇒ Object
readonly
Returns the value of attribute remote_window.
-
#state ⇒ Object
Stream state, e.g.
Class Method Summary collapse
Instance Method Summary collapse
-
#accept_push_promise_stream(stream_id, headers) ⇒ Object
Override this function to implement your own push promise logic.
- #active? ⇒ Boolean
-
#close(error = nil) ⇒ Object
Transition directly to closed state.
-
#close!(error_code = nil) ⇒ Object
Transition the stream into the closed state.
-
#closed(error = nil) ⇒ Object
The stream has been closed.
- #closed? ⇒ Boolean
- #consume_remote_window(frame) ⇒ Object
-
#create_push_promise_stream(headers) ⇒ Object
Override this function to implement your own push promise logic.
- #ignore_data(frame) ⇒ Object
-
#initialize(connection, id, state = :idle) ⇒ Stream
constructor
A new instance of Stream.
- #inspect ⇒ Object
- #maximum_frame_size ⇒ Object
- #open! ⇒ Object
-
#opened(error = nil) ⇒ Object
The stream has been opened.
- #parent=(stream) ⇒ Object
- #priority ⇒ Object
- #priority=(priority) ⇒ Object
-
#process_data(frame) ⇒ String
The data that was received.
- #process_headers(frame) ⇒ Object
-
#receive_data(frame) ⇒ Object
DATA frames are subject to flow control and can only be sent when a stream is in the “open” or “half-closed (remote)” state.
- #receive_headers(frame) ⇒ Object
- #receive_push_promise(frame) ⇒ Object
- #receive_reset_stream(frame) ⇒ Object
- #reserved_local! ⇒ Object
- #reserved_remote! ⇒ Object
- #send_data(*arguments, **options) ⇒ Object
-
#send_headers(*arguments) ⇒ Object
The HEADERS frame is used to open a stream, and additionally carries a header block fragment.
-
#send_headers? ⇒ Boolean
HEADERS frames can be sent on a stream in the “idle”, “reserved (local)”, “open”, or “half-closed (remote)” state.
-
#send_push_promise(headers) ⇒ Object
Server push is semantically equivalent to a server responding to a request; however, in this case, that request is also sent by the server, as a PUSH_PROMISE frame.
- #send_reset_stream(error_code = 0) ⇒ Object
- #weight ⇒ Object
- #write_frame(frame) ⇒ Object
Methods included from FlowControlled
#available_frame_size, #available_size, #consume_local_window, #receive_window_update, #request_window_update, #send_window_update, #update_local_window, #window_updated
Constructor Details
#initialize(connection, id, state = :idle) ⇒ Stream
Returns a new instance of Stream.
70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/protocol/http2/stream.rb', line 70 def initialize(connection, id, state = :idle) @connection = connection @id = id @state = state @local_window = Window.new(@connection.local_settings.initial_window_size) @remote_window = Window.new(@connection.remote_settings.initial_window_size) @dependency = Dependency.create(@connection, @id) end |
Instance Attribute Details
#connection ⇒ Object (readonly)
The connection this stream belongs to.
83 84 85 |
# File 'lib/protocol/http2/stream.rb', line 83 def connection @connection end |
#dependency ⇒ Object (readonly)
Returns the value of attribute dependency.
91 92 93 |
# File 'lib/protocol/http2/stream.rb', line 91 def dependency @dependency end |
#id ⇒ Object (readonly)
Stream ID (odd for client initiated streams, even otherwise).
86 87 88 |
# File 'lib/protocol/http2/stream.rb', line 86 def id @id end |
#local_window ⇒ Object (readonly)
Returns the value of attribute local_window.
93 94 95 |
# File 'lib/protocol/http2/stream.rb', line 93 def local_window @local_window end |
#remote_window ⇒ Object (readonly)
Returns the value of attribute remote_window.
94 95 96 |
# File 'lib/protocol/http2/stream.rb', line 94 def remote_window @remote_window end |
#state ⇒ Object
Stream state, e.g. ‘idle`, `closed`.
89 90 91 |
# File 'lib/protocol/http2/stream.rb', line 89 def state @state end |
Class Method Details
.create(connection, id) ⇒ Object
62 63 64 65 66 67 68 |
# File 'lib/protocol/http2/stream.rb', line 62 def self.create(connection, id) stream = self.new(connection, id) connection.streams[id] = stream return stream end |
Instance Method Details
#accept_push_promise_stream(stream_id, headers) ⇒ Object
Override this function to implement your own push promise logic.
425 426 427 |
# File 'lib/protocol/http2/stream.rb', line 425 def accept_push_promise_stream(stream_id, headers) @connection.accept_push_promise_stream(stream_id) end |
#active? ⇒ Boolean
120 121 122 |
# File 'lib/protocol/http2/stream.rb', line 120 def active? @state != :closed && @state != :idle end |
#close(error = nil) ⇒ Object
Transition directly to closed state. Do not pass go, do not collect $200. This method should only be used by ‘Connection#close`.
130 131 132 133 134 135 |
# File 'lib/protocol/http2/stream.rb', line 130 def close(error = nil) unless closed? @state = :closed self.closed(error) end end |
#close!(error_code = nil) ⇒ Object
Transition the stream into the closed state.
244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/protocol/http2/stream.rb', line 244 def close!(error_code = nil) @state = :closed @connection.delete(@id) if error_code error = StreamError.new("Stream closed!", error_code) end self.closed(error) return self end |
#closed(error = nil) ⇒ Object
The stream has been closed. If closed due to a stream reset, the error will be set.
239 240 |
# File 'lib/protocol/http2/stream.rb', line 239 def closed(error = nil) end |
#closed? ⇒ Boolean
124 125 126 |
# File 'lib/protocol/http2/stream.rb', line 124 def closed? @state == :closed end |
#consume_remote_window(frame) ⇒ Object
187 188 189 190 191 |
# File 'lib/protocol/http2/stream.rb', line 187 def consume_remote_window(frame) super @connection.consume_remote_window(frame) end |
#create_push_promise_stream(headers) ⇒ Object
Override this function to implement your own push promise logic.
403 404 405 |
# File 'lib/protocol/http2/stream.rb', line 403 def create_push_promise_stream(headers) @connection.create_push_promise_stream end |
#ignore_data(frame) ⇒ Object
322 323 324 |
# File 'lib/protocol/http2/stream.rb', line 322 def ignore_data(frame) # Async.logger.warn(self) {"Received headers in state: #{@state}!"} end |
#inspect ⇒ Object
440 441 442 |
# File 'lib/protocol/http2/stream.rb', line 440 def inspect "\#<#{self.class} id=#{@id} state=#{@state}>" end |
#maximum_frame_size ⇒ Object
112 113 114 |
# File 'lib/protocol/http2/stream.rb', line 112 def maximum_frame_size @connection.available_frame_size end |
#open! ⇒ Object
226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/protocol/http2/stream.rb', line 226 def open! if @state == :idle @state = :open else raise ProtocolError, "Cannot open stream in state: #{@state}" end self.opened return self end |
#opened(error = nil) ⇒ Object
The stream has been opened.
223 224 |
# File 'lib/protocol/http2/stream.rb', line 223 def opened(error = nil) end |
#parent=(stream) ⇒ Object
108 109 110 |
# File 'lib/protocol/http2/stream.rb', line 108 def parent=(stream) @dependency.parent = stream.dependency end |
#priority ⇒ Object
100 101 102 |
# File 'lib/protocol/http2/stream.rb', line 100 def priority @dependency.priority end |
#priority=(priority) ⇒ Object
104 105 106 |
# File 'lib/protocol/http2/stream.rb', line 104 def priority= priority @dependency.priority = priority end |
#process_data(frame) ⇒ String
Returns the data that was received.
318 319 320 |
# File 'lib/protocol/http2/stream.rb', line 318 def process_data(frame) frame.unpack end |
#process_headers(frame) ⇒ Object
270 271 272 273 274 275 276 277 278 279 |
# File 'lib/protocol/http2/stream.rb', line 270 def process_headers(frame) # Receiving request headers: priority, data = frame.unpack if priority @dependency.process_priority(priority) end @connection.decode_headers(data) end |
#receive_data(frame) ⇒ Object
DATA frames are subject to flow control and can only be sent when a stream is in the “open” or “half-closed (remote)” state. The entire DATA frame payload is included in flow control, including the Pad Length and Padding fields if present. If a DATA frame is received whose stream is not in “open” or “half-closed (local)” state, the recipient MUST respond with a stream error of type STREAM_CLOSED.
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 |
# File 'lib/protocol/http2/stream.rb', line 327 def receive_data(frame) if @state == :open update_local_window(frame) if frame.end_stream? @state = :half_closed_remote end process_data(frame) elsif @state == :half_closed_local update_local_window(frame) process_data(frame) if frame.end_stream? close! end elsif self.closed? ignore_data(frame) else # If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state, the recipient MUST respond with a stream error (Section 5.4.2) of type STREAM_CLOSED. self.send_reset_stream(Error::STREAM_CLOSED) end end |
#receive_headers(frame) ⇒ Object
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/protocol/http2/stream.rb', line 285 def receive_headers(frame) if @state == :idle if frame.end_stream? @state = :half_closed_remote else open! end process_headers(frame) elsif @state == :reserved_remote process_headers(frame) @state = :half_closed_local elsif @state == :open process_headers(frame) if frame.end_stream? @state = :half_closed_remote end elsif @state == :half_closed_local process_headers(frame) if frame.end_stream? close! end elsif self.closed? ignore_headers(frame) else self.send_reset_stream(Error::STREAM_CLOSED) end end |
#receive_push_promise(frame) ⇒ Object
429 430 431 432 433 434 435 436 437 438 |
# File 'lib/protocol/http2/stream.rb', line 429 def receive_push_promise(frame) promised_stream_id, data = frame.unpack headers = @connection.decode_headers(data) stream = self.accept_push_promise_stream(promised_stream_id, headers) stream.parent = self stream.reserved_remote! return stream, headers end |
#receive_reset_stream(frame) ⇒ Object
352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'lib/protocol/http2/stream.rb', line 352 def receive_reset_stream(frame) if @state == :idle # If a RST_STREAM frame identifying an idle stream is received, the recipient MUST treat this as a connection error (Section 5.4.1) of type PROTOCOL_ERROR. raise ProtocolError, "Cannot receive reset stream in state: #{@state}!" else error_code = frame.unpack close!(error_code) return error_code end end |
#reserved_local! ⇒ Object
382 383 384 385 386 387 388 389 390 |
# File 'lib/protocol/http2/stream.rb', line 382 def reserved_local! if @state == :idle @state = :reserved_local else raise ProtocolError, "Cannot reserve stream in state: #{@state}" end return self end |
#reserved_remote! ⇒ Object
392 393 394 395 396 397 398 399 400 |
# File 'lib/protocol/http2/stream.rb', line 392 def reserved_remote! if @state == :idle @state = :reserved_remote else raise ProtocolError, "Cannot reserve stream in state: #{@state}" end return self end |
#send_data(*arguments, **options) ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/protocol/http2/stream.rb', line 204 def send_data(*arguments, **) if @state == :open frame = write_data(*arguments, **) if frame.end_stream? @state = :half_closed_local end elsif @state == :half_closed_remote frame = write_data(*arguments, **) if frame.end_stream? close! end else raise ProtocolError, "Cannot send data in state: #{@state}" end end |
#send_headers(*arguments) ⇒ Object
The HEADERS frame is used to open a stream, and additionally carries a header block fragment. HEADERS frames can be sent on a stream in the “idle”, “reserved (local)”, “open”, or “half-closed (remote)” state.
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/protocol/http2/stream.rb', line 157 def send_headers(*arguments) if @state == :idle frame = write_headers(*arguments) if frame.end_stream? @state = :half_closed_local else open! end elsif @state == :reserved_local frame = write_headers(*arguments) @state = :half_closed_remote elsif @state == :open frame = write_headers(*arguments) if frame.end_stream? @state = :half_closed_local end elsif @state == :half_closed_remote frame = write_headers(*arguments) if frame.end_stream? close! end else raise ProtocolError, "Cannot send headers in state: #{@state}" end end |
#send_headers? ⇒ Boolean
HEADERS frames can be sent on a stream in the “idle”, “reserved (local)”, “open”, or “half-closed (remote)” state. Despite it’s name, it can also be used for trailers.
138 139 140 |
# File 'lib/protocol/http2/stream.rb', line 138 def send_headers? @state == :idle or @state == :reserved_local or @state == :open or @state == :half_closed_remote end |
#send_push_promise(headers) ⇒ Object
Server push is semantically equivalent to a server responding to a request; however, in this case, that request is also sent by the server, as a PUSH_PROMISE frame.
409 410 411 412 413 414 415 416 417 418 419 420 421 422 |
# File 'lib/protocol/http2/stream.rb', line 409 def send_push_promise(headers) if @state == :open or @state == :half_closed_remote promised_stream = self.create_push_promise_stream(headers) promised_stream.reserved_local! # The headers are the same as if the client had sent a request: write_push_promise(promised_stream.id, headers) # The server should call send_headers on the promised stream to begin sending the response: return promised_stream else raise ProtocolError, "Cannot send push promise in state: #{@state}" end end |
#send_reset_stream(error_code = 0) ⇒ Object
257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/protocol/http2/stream.rb', line 257 def send_reset_stream(error_code = 0) if @state != :idle and @state != :closed frame = ResetStreamFrame.new(@id) frame.pack(error_code) write_frame(frame) close! else raise ProtocolError, "Cannot send reset stream (#{error_code}) in state: #{@state}" end end |
#weight ⇒ Object
96 97 98 |
# File 'lib/protocol/http2/stream.rb', line 96 def weight @dependency.weight end |
#write_frame(frame) ⇒ Object
116 117 118 |
# File 'lib/protocol/http2/stream.rb', line 116 def write_frame(frame) @connection.write_frame(frame) end |