Class: Protocol::HTTP2::Connection
- Inherits:
-
Object
- Object
- Protocol::HTTP2::Connection
- Includes:
- FlowControlled
- Defined in:
- lib/protocol/http2/connection.rb
Instance Attribute Summary collapse
-
#dependencies ⇒ Object
readonly
Returns the value of attribute dependencies.
-
#dependency ⇒ Object
readonly
Returns the value of attribute dependency.
-
#framer ⇒ Object
readonly
Returns the value of attribute framer.
-
#local_settings ⇒ Object
Current settings value for local and peer.
-
#local_window ⇒ Object
readonly
Our window for receiving data.
-
#remote_settings ⇒ Object
Returns the value of attribute remote_settings.
-
#remote_stream_id ⇒ Object
readonly
The highest stream_id that has been successfully accepted by this connection.
-
#remote_window ⇒ Object
readonly
Our window for sending data.
-
#state ⇒ Object
Connection state (:new, :open, :closed).
-
#streams ⇒ Object
readonly
Returns the value of attribute streams.
Instance Method Summary collapse
- #[](id) ⇒ Object
-
#accept_push_promise_stream(stream_id, &block) ⇒ Object
Accept an incoming push promise from the other side of the connection.
-
#accept_stream(stream_id, &block) ⇒ Object
Accept an incoming stream from the other side of the connnection.
- #client_stream_id?(id) ⇒ Boolean
-
#close(error = nil) ⇒ Object
Close the underlying framer and all streams.
-
#close! ⇒ Object
Transition the connection into the closed state.
-
#closed? ⇒ Boolean
Whether the connection is effectively or actually closed.
-
#closed_stream_id?(id) ⇒ Boolean
This is only valid if the stream doesn’t exist in ‘@streams`.
-
#consume_window(size = self.available_size) ⇒ Object
Traverse active streams and allow them to consume the available flow-control window.
- #create_push_promise_stream(&block) ⇒ Object
-
#create_stream(id = next_stream_id, &block) ⇒ Stream
Create a stream, defaults to an outgoing stream.
- #decode_headers(data) ⇒ Object
- #delete(id) ⇒ Object
- #encode_headers(headers, buffer = String.new.b) ⇒ Object
- #id ⇒ Object
- #idle_stream_id?(id) ⇒ Boolean
-
#ignore_frame?(frame) ⇒ Boolean
6.8.
-
#initialize(framer, local_stream_id) ⇒ Connection
constructor
A new instance of Connection.
-
#maximum_concurrent_streams ⇒ Object
The maximum number of concurrent streams that this connection can initiate.
-
#maximum_frame_size ⇒ Object
The size of a frame payload is limited by the maximum size that a receiver advertises in the SETTINGS_MAX_FRAME_SIZE setting.
-
#next_stream_id ⇒ Object
Streams are identified with an unsigned 31-bit integer.
- #open! ⇒ Object
-
#process_settings(frame) ⇒ Boolean
In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the “open” or “half-closed (remote)” state).
-
#read_frame ⇒ Object
Reads one frame from the network and processes.
- #receive_continuation(frame) ⇒ Object
- #receive_data(frame) ⇒ Object
- #receive_frame(frame) ⇒ Object
- #receive_goaway(frame) ⇒ Object
-
#receive_headers(frame) ⇒ Object
On the server side, starts a new request.
- #receive_ping(frame) ⇒ Object
- #receive_priority_update(frame) ⇒ Object
- #receive_push_promise(frame) ⇒ Object
- #receive_reset_stream(frame) ⇒ Object
- #receive_settings(frame) ⇒ Object
- #receive_window_update(frame) ⇒ Object
-
#send_goaway(error_code = 0, message = "") ⇒ Object
Tell the remote end that the connection is being shut down.
- #send_ping(data) ⇒ Object
- #send_settings(changes) ⇒ Object
- #server_stream_id?(id) ⇒ Boolean
- #synchronize ⇒ Object
- #update_local_settings(changes) ⇒ Object
- #update_remote_settings(changes) ⇒ Object
- #valid_remote_stream_id?(stream_id) ⇒ Boolean
- #write_frame(frame) ⇒ Object
- #write_frames ⇒ Object
Methods included from FlowControlled
#available_frame_size, #available_size, #consume_local_window, #consume_remote_window, #request_window_update, #send_window_update, #update_local_window, #window_updated
Constructor Details
#initialize(framer, local_stream_id) ⇒ Connection
Returns a new instance of Connection.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/protocol/http2/connection.rb', line 18 def initialize(framer, local_stream_id) super() @state = :new # Hash(Integer, Stream) @streams = {} @framer = framer # The next stream id to use: @local_stream_id = local_stream_id # The biggest remote stream id seen thus far: @remote_stream_id = 0 @local_settings = PendingSettings.new @remote_settings = Settings.new @decoder = HPACK::Context.new @encoder = HPACK::Context.new @local_window = LocalWindow.new @remote_window = Window.new end |
Instance Attribute Details
#dependencies ⇒ Object (readonly)
Returns the value of attribute dependencies.
128 129 130 |
# File 'lib/protocol/http2/connection.rb', line 128 def dependencies @dependencies end |
#dependency ⇒ Object (readonly)
Returns the value of attribute dependency.
130 131 132 |
# File 'lib/protocol/http2/connection.rb', line 130 def dependency @dependency end |
#framer ⇒ Object (readonly)
Returns the value of attribute framer.
68 69 70 |
# File 'lib/protocol/http2/connection.rb', line 68 def framer @framer end |
#local_settings ⇒ Object
Current settings value for local and peer
74 75 76 |
# File 'lib/protocol/http2/connection.rb', line 74 def local_settings @local_settings end |
#local_window ⇒ Object (readonly)
Our window for receiving data. When we receive data, it reduces this window. If the window gets too small, we must send a window update.
79 80 81 |
# File 'lib/protocol/http2/connection.rb', line 79 def local_window @local_window end |
#remote_settings ⇒ Object
Returns the value of attribute remote_settings.
75 76 77 |
# File 'lib/protocol/http2/connection.rb', line 75 def remote_settings @remote_settings end |
#remote_stream_id ⇒ Object (readonly)
The highest stream_id that has been successfully accepted by this connection.
85 86 87 |
# File 'lib/protocol/http2/connection.rb', line 85 def remote_stream_id @remote_stream_id end |
#remote_window ⇒ Object (readonly)
Our window for sending data. When we send data, it reduces this window.
82 83 84 |
# File 'lib/protocol/http2/connection.rb', line 82 def remote_window @remote_window end |
#state ⇒ Object
Connection state (:new, :open, :closed).
71 72 73 |
# File 'lib/protocol/http2/connection.rb', line 71 def state @state end |
#streams ⇒ Object (readonly)
Returns the value of attribute streams.
126 127 128 |
# File 'lib/protocol/http2/connection.rb', line 126 def streams @streams end |
Instance Method Details
#[](id) ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/protocol/http2/connection.rb', line 48 def [] id if id.zero? self else @streams[id] end end |
#accept_push_promise_stream(stream_id, &block) ⇒ Object
Accept an incoming push promise from the other side of the connection. On the client side, we accept push promise streams. On the server side, existing streams create push promise streams.
350 351 352 |
# File 'lib/protocol/http2/connection.rb', line 350 def accept_push_promise_stream(stream_id, &block) accept_stream(stream_id, &block) end |
#accept_stream(stream_id, &block) ⇒ Object
Accept an incoming stream from the other side of the connnection. On the server side, we accept requests.
339 340 341 342 343 344 345 |
# File 'lib/protocol/http2/connection.rb', line 339 def accept_stream(stream_id, &block) unless valid_remote_stream_id?(stream_id) raise ProtocolError, "Invalid stream id: #{stream_id}" end create_stream(stream_id, &block) end |
#client_stream_id?(id) ⇒ Boolean
417 418 419 |
# File 'lib/protocol/http2/connection.rb', line 417 def client_stream_id?(id) id.odd? end |
#close(error = nil) ⇒ Object
Close the underlying framer and all streams.
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/protocol/http2/connection.rb', line 97 def close(error = nil) # The underlying socket may already be closed by this point. @streams.each_value{|stream| stream.close(error)} @streams.clear ensure if @framer @framer.close @framer = nil end end |
#close! ⇒ Object
Transition the connection into the closed state.
184 185 186 187 188 |
# File 'lib/protocol/http2/connection.rb', line 184 def close! @state = :closed return self end |
#closed? ⇒ Boolean
Whether the connection is effectively or actually closed.
88 89 90 |
# File 'lib/protocol/http2/connection.rb', line 88 def closed? @state == :closed || @framer.nil? end |
#closed_stream_id?(id) ⇒ Boolean
This is only valid if the stream doesn’t exist in ‘@streams`.
444 445 446 447 448 449 450 451 |
# File 'lib/protocol/http2/connection.rb', line 444 def closed_stream_id?(id) if id.zero? # The connection "stream id" can never be closed: false else !idle_stream_id?(id) end end |
#consume_window(size = self.available_size) ⇒ Object
Traverse active streams and allow them to consume the available flow-control window.
467 468 469 470 471 472 473 474 475 476 |
# File 'lib/protocol/http2/connection.rb', line 467 def consume_window(size = self.available_size) # Return if there is no window to consume: return unless size > 0 @streams.each_value do |stream| if stream.active? stream.window_updated(size) end end end |
#create_push_promise_stream(&block) ⇒ Object
369 370 371 |
# File 'lib/protocol/http2/connection.rb', line 369 def create_push_promise_stream(&block) create_stream(&block) end |
#create_stream(id = next_stream_id, &block) ⇒ Stream
Create a stream, defaults to an outgoing stream. On the client side, we create requests.
357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/protocol/http2/connection.rb', line 357 def create_stream(id = next_stream_id, &block) if @streams.key?(id) raise ProtocolError, "Cannot create stream with id #{id}, already exists!" end if block_given? return yield(self, id) else return Stream.create(self, id) end end |
#decode_headers(data) ⇒ Object
113 114 115 |
# File 'lib/protocol/http2/connection.rb', line 113 def decode_headers(data) HPACK::Decompressor.new(data, @decoder, table_size_limit: @local_settings.header_table_size).decode end |
#delete(id) ⇒ Object
92 93 94 |
# File 'lib/protocol/http2/connection.rb', line 92 def delete(id) @streams.delete(id) end |
#encode_headers(headers, buffer = String.new.b) ⇒ Object
109 110 111 |
# File 'lib/protocol/http2/connection.rb', line 109 def encode_headers(headers, buffer = String.new.b) HPACK::Compressor.new(buffer, @encoder, table_size_limit: @remote_settings.header_table_size).encode(headers) end |
#id ⇒ Object
44 45 46 |
# File 'lib/protocol/http2/connection.rb', line 44 def id 0 end |
#idle_stream_id?(id) ⇒ Boolean
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 |
# File 'lib/protocol/http2/connection.rb', line 425 def idle_stream_id?(id) if id.even? # Server-initiated streams are even. if @local_stream_id.even? id >= @local_stream_id else id > @remote_stream_id end elsif id.odd? # Client-initiated streams are odd. if @local_stream_id.odd? id >= @local_stream_id else id > @remote_stream_id end end end |
#ignore_frame?(frame) ⇒ Boolean
6.8. GOAWAY There is an inherent race condition between an endpoint starting new streams and the remote sending a GOAWAY frame. To deal with this case, the GOAWAY contains the stream identifier of the last peer-initiated stream that was or might be processed on the sending endpoint in this connection. For instance, if the server sends a GOAWAY frame, the identified stream is the highest-numbered stream initiated by the client. Once sent, the sender will ignore frames sent on streams initiated by the receiver if the stream has an identifier higher than the included last stream identifier. Receivers of a GOAWAY frame MUST NOT open additional streams on the connection, although a new connection can be established for new streams.
135 136 137 138 139 140 141 142 |
# File 'lib/protocol/http2/connection.rb', line 135 def ignore_frame?(frame) if self.closed? # puts "ignore_frame? #{frame.stream_id} -> #{valid_remote_stream_id?(frame.stream_id)} > #{@remote_stream_id}" if valid_remote_stream_id?(frame.stream_id) return frame.stream_id > @remote_stream_id end end end |
#maximum_concurrent_streams ⇒ Object
The maximum number of concurrent streams that this connection can initiate. This is a setting that can be changed by the remote peer.
It is not the same as the number of streams that can be accepted by the connection. The number of streams that can be accepted is determined by the local settings, and the number of streams that can be initiated is determined by the remote settings.
64 65 66 |
# File 'lib/protocol/http2/connection.rb', line 64 def maximum_concurrent_streams @remote_settings.maximum_concurrent_streams end |
#maximum_frame_size ⇒ Object
The size of a frame payload is limited by the maximum size that a receiver advertises in the SETTINGS_MAX_FRAME_SIZE setting.
57 58 59 |
# File 'lib/protocol/http2/connection.rb', line 57 def maximum_frame_size @remote_settings.maximum_frame_size end |
#next_stream_id ⇒ Object
Streams are identified with an unsigned 31-bit integer. Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers. A stream identifier of zero (0x0) is used for connection control messages; the stream identifier of zero cannot be used to establish a new stream.
118 119 120 121 122 123 124 |
# File 'lib/protocol/http2/connection.rb', line 118 def next_stream_id id = @local_stream_id @local_stream_id += 2 return id end |
#open! ⇒ Object
276 277 278 279 280 |
# File 'lib/protocol/http2/connection.rb', line 276 def open! @state = :open return self end |
#process_settings(frame) ⇒ Boolean
In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the “open” or “half-closed (remote)” state). When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream flow-control windows that it maintains by the difference between the new value and the old value.
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/protocol/http2/connection.rb', line 253 def process_settings(frame) if frame.acknowledgement? # The remote end has confirmed the settings have been received: changes = @local_settings.acknowledge update_local_settings(changes) return true else # The remote end is updating the settings, we reply with acknowledgement: reply = frame.acknowledge write_frame(reply) changes = frame.unpack @remote_settings.update(changes) update_remote_settings(changes) return false end end |
#read_frame ⇒ Object
Reads one frame from the network and processes. Processing the frame updates the state of the connection and related streams. If the frame triggers an error, e.g. a protocol error, the connection will typically emit a goaway frame and re-raise the exception. You should continue processing frames until the underlying connection is closed.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/protocol/http2/connection.rb', line 149 def read_frame frame = @framer.read_frame(@local_settings.maximum_frame_size) # puts "#{self.class} #{@state} read_frame: class=#{frame.class} stream_id=#{frame.stream_id} flags=#{frame.flags} length=#{frame.length} (remote_stream_id=#{@remote_stream_id})" # puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}" return if ignore_frame?(frame) yield frame if block_given? frame.apply(self) return frame rescue GoawayError => error # Go directly to jail. Do not pass go, do not collect $200. raise rescue ProtocolError => error send_goaway(error.code || PROTOCOL_ERROR, error.) raise rescue HPACK::Error => error send_goaway(COMPRESSION_ERROR, error.) raise end |
#receive_continuation(frame) ⇒ Object
497 498 499 |
# File 'lib/protocol/http2/connection.rb', line 497 def receive_continuation(frame) raise ProtocolError, "Received unexpected continuation: #{frame.class}" end |
#receive_data(frame) ⇒ Object
321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/protocol/http2/connection.rb', line 321 def receive_data(frame) update_local_window(frame) if stream = @streams[frame.stream_id] stream.receive_data(frame) elsif closed_stream_id?(frame.stream_id) # This can occur if one end sent a stream reset, while the other end was sending a data frame. It's mostly harmless. else raise ProtocolError, "Cannot receive data for stream id #{frame.stream_id}" end end |
#receive_frame(frame) ⇒ Object
501 502 503 |
# File 'lib/protocol/http2/connection.rb', line 501 def receive_frame(frame) # ignore. end |
#receive_goaway(frame) ⇒ Object
200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/protocol/http2/connection.rb', line 200 def receive_goaway(frame) # We capture the last stream that was processed. @remote_stream_id, error_code, = frame.unpack self.close! if error_code != 0 # Shut down immediately. raise GoawayError.new(, error_code) end end |
#receive_headers(frame) ⇒ Object
On the server side, starts a new request.
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 |
# File 'lib/protocol/http2/connection.rb', line 374 def receive_headers(frame) stream_id = frame.stream_id if stream_id.zero? raise ProtocolError, "Cannot receive headers for stream 0!" end if stream = @streams[stream_id] stream.receive_headers(frame) else if stream_id <= @remote_stream_id raise ProtocolError, "Invalid stream id: #{stream_id} <= #{@remote_stream_id}!" end # We need to validate that we have less streams than the specified maximum: if @streams.size < @local_settings.maximum_concurrent_streams stream = accept_stream(stream_id) @remote_stream_id = stream_id stream.receive_headers(frame) else raise ProtocolError, "Exceeded maximum concurrent streams" end end end |
#receive_ping(frame) ⇒ Object
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
# File 'lib/protocol/http2/connection.rb', line 304 def receive_ping(frame) if @state != :closed # This is handled in `read_payload`: # if frame.stream_id != 0 # raise ProtocolError, "Ping received for non-zero stream!" # end unless frame.acknowledgement? reply = frame.acknowledge write_frame(reply) end else raise ProtocolError, "Cannot receive ping in state #{@state}" end end |
#receive_priority_update(frame) ⇒ Object
404 405 406 407 408 409 410 411 412 413 414 415 |
# File 'lib/protocol/http2/connection.rb', line 404 def receive_priority_update(frame) if frame.stream_id != 0 raise ProtocolError, "Invalid stream id: #{frame.stream_id}" end stream_id, value = frame.unpack # Apparently you can set the priority of idle streams, but I'm not sure why that makes sense, so for now let's ignore it. if stream = @streams[stream_id] stream.priority = Protocol::HTTP::Header::Priority.new(value) end end |
#receive_push_promise(frame) ⇒ Object
400 401 402 |
# File 'lib/protocol/http2/connection.rb', line 400 def receive_push_promise(frame) raise ProtocolError, "Unable to receive push promise!" end |
#receive_reset_stream(frame) ⇒ Object
453 454 455 456 457 458 459 460 461 462 463 |
# File 'lib/protocol/http2/connection.rb', line 453 def receive_reset_stream(frame) if frame.connection? raise ProtocolError, "Cannot reset connection!" elsif stream = @streams[frame.stream_id] stream.receive_reset_stream(frame) elsif closed_stream_id?(frame.stream_id) # Ignore. else raise StreamClosed, "Cannot reset stream #{frame.stream_id}" end end |
#receive_settings(frame) ⇒ Object
282 283 284 285 286 287 288 289 290 291 |
# File 'lib/protocol/http2/connection.rb', line 282 def receive_settings(frame) if @state == :new # We transition to :open when we receive acknowledgement of first settings frame: open! if process_settings(frame) elsif @state != :closed process_settings(frame) else raise ProtocolError, "Cannot receive settings in state #{@state}" end end |
#receive_window_update(frame) ⇒ Object
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 |
# File 'lib/protocol/http2/connection.rb', line 478 def receive_window_update(frame) if frame.connection? super self.consume_window elsif stream = @streams[frame.stream_id] begin stream.receive_window_update(frame) rescue ProtocolError => error stream.send_reset_stream(error.code) end elsif closed_stream_id?(frame.stream_id) # Ignore. else # Receiving any frame other than HEADERS or PRIORITY on a stream in this state (idle) MUST be treated as a connection error of type PROTOCOL_ERROR. raise ProtocolError, "Cannot update window of idle stream #{frame.stream_id}" end end |
#send_goaway(error_code = 0, message = "") ⇒ Object
Tell the remote end that the connection is being shut down. If the ‘error_code` is 0, this is a graceful shutdown. The other end of the connection should not make any new streams, but existing streams may be completed.
191 192 193 194 195 196 197 198 |
# File 'lib/protocol/http2/connection.rb', line 191 def send_goaway(error_code = 0, = "") frame = GoawayFrame.new frame.pack @remote_stream_id, error_code, write_frame(frame) ensure self.close! end |
#send_ping(data) ⇒ Object
293 294 295 296 297 298 299 300 301 302 |
# File 'lib/protocol/http2/connection.rb', line 293 def send_ping(data) if @state != :closed frame = PingFrame.new frame.pack data write_frame(frame) else raise ProtocolError, "Cannot send ping in state #{@state}" end end |
#send_settings(changes) ⇒ Object
174 175 176 177 178 179 180 181 |
# File 'lib/protocol/http2/connection.rb', line 174 def send_settings(changes) @local_settings.append(changes) frame = SettingsFrame.new frame.pack(changes) write_frame(frame) end |
#server_stream_id?(id) ⇒ Boolean
421 422 423 |
# File 'lib/protocol/http2/connection.rb', line 421 def server_stream_id?(id) id.even? end |
#synchronize ⇒ Object
144 145 146 |
# File 'lib/protocol/http2/connection.rb', line 144 def synchronize yield end |
#update_local_settings(changes) ⇒ Object
232 233 234 235 236 237 238 239 240 |
# File 'lib/protocol/http2/connection.rb', line 232 def update_local_settings(changes) capacity = @local_settings.initial_window_size @streams.each_value do |stream| stream.local_window.capacity = capacity end @local_window.desired = capacity end |
#update_remote_settings(changes) ⇒ Object
242 243 244 245 246 247 248 |
# File 'lib/protocol/http2/connection.rb', line 242 def update_remote_settings(changes) capacity = @remote_settings.initial_window_size @streams.each_value do |stream| stream.remote_window.capacity = capacity end end |
#valid_remote_stream_id?(stream_id) ⇒ Boolean
333 334 335 |
# File 'lib/protocol/http2/connection.rb', line 333 def valid_remote_stream_id?(stream_id) false end |
#write_frame(frame) ⇒ Object
212 213 214 215 216 217 218 |
# File 'lib/protocol/http2/connection.rb', line 212 def write_frame(frame) synchronize do @framer.write_frame(frame) end @framer.flush end |
#write_frames ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/protocol/http2/connection.rb', line 220 def write_frames if @framer synchronize do yield @framer end @framer.flush else raise EOFError, "Connection closed!" end end |