Class: Protocol::HTTP2::Connection
- Inherits:
-
Object
- Object
- Protocol::HTTP2::Connection
- Includes:
- FlowControlled
- Defined in:
- lib/protocol/http2/connection.rb
Instance Attribute Summary collapse
-
#dependencies ⇒ Object
readonly
Returns the value of attribute dependencies.
-
#dependency ⇒ Object
readonly
Returns the value of attribute dependency.
-
#framer ⇒ Object
readonly
Returns the value of attribute framer.
-
#local_settings ⇒ Object
Current settings value for local and peer.
-
#local_window ⇒ Object
readonly
Our window for receiving data.
-
#remote_settings ⇒ Object
Returns the value of attribute remote_settings.
-
#remote_stream_id ⇒ Object
readonly
The highest stream_id that has been successfully accepted by this connection.
-
#remote_window ⇒ Object
readonly
Our window for sending data.
-
#state ⇒ Object
Connection state (:new, :open, :closed).
-
#streams ⇒ Object
readonly
Returns the value of attribute streams.
Instance Method Summary collapse
- #[](id) ⇒ Object
-
#accept_push_promise_stream(stream_id, &block) ⇒ Object
Accept an incoming push promise from the other side of the connection.
-
#accept_stream(stream_id, &block) ⇒ Object
Accept an incoming stream from the other side of the connnection.
- #client_stream_id?(id) ⇒ Boolean
-
#close(error = nil) ⇒ Object
Close the underlying framer and all streams.
-
#close! ⇒ Object
Transition the connection into the closed state.
-
#closed? ⇒ Boolean
Whether the connection is effectively or actually closed.
-
#closed_stream_id?(id) ⇒ Boolean
This is only valid if the stream doesn’t exist in ‘@streams`.
-
#consume_window(size = self.available_size) ⇒ Object
Traverse active streams in order of priority and allow them to consume the available flow-control window.
- #create_push_promise_stream(&block) ⇒ Object
-
#create_stream(id = next_stream_id, &block) ⇒ Stream
Create a stream, defaults to an outgoing stream.
- #decode_headers(data) ⇒ Object
- #delete(id) ⇒ Object
- #encode_headers(headers, buffer = String.new.b) ⇒ Object
- #id ⇒ Object
- #idle_stream_id?(id) ⇒ Boolean
-
#ignore_frame?(frame) ⇒ Boolean
6.8.
-
#initialize(framer, local_stream_id) ⇒ Connection
constructor
A new instance of Connection.
- #maximum_concurrent_streams ⇒ Object
-
#maximum_frame_size ⇒ Object
The size of a frame payload is limited by the maximum size that a receiver advertises in the SETTINGS_MAX_FRAME_SIZE setting.
-
#next_stream_id ⇒ Object
Streams are identified with an unsigned 31-bit integer.
- #open! ⇒ Object
-
#process_settings(frame) ⇒ Boolean
In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the “open” or “half-closed (remote)” state).
-
#read_frame ⇒ Object
Reads one frame from the network and processes.
- #receive_continuation(frame) ⇒ Object
- #receive_data(frame) ⇒ Object
- #receive_frame(frame) ⇒ Object
- #receive_goaway(frame) ⇒ Object
-
#receive_headers(frame) ⇒ Object
On the server side, starts a new request.
- #receive_ping(frame) ⇒ Object
-
#receive_priority(frame) ⇒ Object
Sets the priority for an incoming stream.
- #receive_push_promise(frame) ⇒ Object
- #receive_reset_stream(frame) ⇒ Object
- #receive_settings(frame) ⇒ Object
- #receive_window_update(frame) ⇒ Object
-
#send_goaway(error_code = 0, message = "") ⇒ Object
Tell the remote end that the connection is being shut down.
- #send_ping(data) ⇒ Object
- #send_priority(stream_id, priority) ⇒ Object
- #send_settings(changes) ⇒ Object
- #server_stream_id?(id) ⇒ Boolean
- #update_local_settings(changes) ⇒ Object
- #update_remote_settings(changes) ⇒ Object
- #valid_remote_stream_id?(stream_id) ⇒ Boolean
- #write_frame(frame) ⇒ Object
- #write_frames {|@framer| ... } ⇒ Object
Methods included from FlowControlled
#available_frame_size, #available_size, #consume_local_window, #consume_remote_window, #request_window_update, #send_window_update, #update_local_window, #window_updated
Constructor Details
#initialize(framer, local_stream_id) ⇒ Connection
Returns a new instance of Connection.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/protocol/http2/connection.rb', line 18 def initialize(framer, local_stream_id) super() @state = :new # Hash(Integer, Stream) @streams = {} # Hash(Integer, Dependency) @dependency = Dependency.new(self, 0) @dependencies = {0 => @dependency} @framer = framer # The next stream id to use: @local_stream_id = local_stream_id # The biggest remote stream id seen thus far: @remote_stream_id = 0 @local_settings = PendingSettings.new @remote_settings = Settings.new @decoder = HPACK::Context.new @encoder = HPACK::Context.new @local_window = LocalWindow.new() @remote_window = Window.new end |
Instance Attribute Details
#dependencies ⇒ Object (readonly)
Returns the value of attribute dependencies.
129 130 131 |
# File 'lib/protocol/http2/connection.rb', line 129 def dependencies @dependencies end |
#dependency ⇒ Object (readonly)
Returns the value of attribute dependency.
131 132 133 |
# File 'lib/protocol/http2/connection.rb', line 131 def dependency @dependency end |
#framer ⇒ Object (readonly)
Returns the value of attribute framer.
69 70 71 |
# File 'lib/protocol/http2/connection.rb', line 69 def framer @framer end |
#local_settings ⇒ Object
Current settings value for local and peer
75 76 77 |
# File 'lib/protocol/http2/connection.rb', line 75 def local_settings @local_settings end |
#local_window ⇒ Object (readonly)
Our window for receiving data. When we receive data, it reduces this window. If the window gets too small, we must send a window update.
80 81 82 |
# File 'lib/protocol/http2/connection.rb', line 80 def local_window @local_window end |
#remote_settings ⇒ Object
Returns the value of attribute remote_settings.
76 77 78 |
# File 'lib/protocol/http2/connection.rb', line 76 def remote_settings @remote_settings end |
#remote_stream_id ⇒ Object (readonly)
The highest stream_id that has been successfully accepted by this connection.
86 87 88 |
# File 'lib/protocol/http2/connection.rb', line 86 def remote_stream_id @remote_stream_id end |
#remote_window ⇒ Object (readonly)
Our window for sending data. When we send data, it reduces this window.
83 84 85 |
# File 'lib/protocol/http2/connection.rb', line 83 def remote_window @remote_window end |
#state ⇒ Object
Connection state (:new, :open, :closed).
72 73 74 |
# File 'lib/protocol/http2/connection.rb', line 72 def state @state end |
#streams ⇒ Object (readonly)
Returns the value of attribute streams.
127 128 129 |
# File 'lib/protocol/http2/connection.rb', line 127 def streams @streams end |
Instance Method Details
#[](id) ⇒ Object
52 53 54 55 56 57 58 |
# File 'lib/protocol/http2/connection.rb', line 52 def [] id if id.zero? self else @streams[id] end end |
#accept_push_promise_stream(stream_id, &block) ⇒ Object
Accept an incoming push promise from the other side of the connection. On the client side, we accept push promise streams. On the server side, existing streams create push promise streams.
334 335 336 |
# File 'lib/protocol/http2/connection.rb', line 334 def accept_push_promise_stream(stream_id, &block) accept_stream(stream_id, &block) end |
#accept_stream(stream_id, &block) ⇒ Object
Accept an incoming stream from the other side of the connnection. On the server side, we accept requests.
323 324 325 326 327 328 329 |
# File 'lib/protocol/http2/connection.rb', line 323 def accept_stream(stream_id, &block) unless valid_remote_stream_id?(stream_id) raise ProtocolError, "Invalid stream id: #{stream_id}" end create_stream(stream_id, &block) end |
#client_stream_id?(id) ⇒ Boolean
403 404 405 |
# File 'lib/protocol/http2/connection.rb', line 403 def client_stream_id?(id) id.odd? end |
#close(error = nil) ⇒ Object
Close the underlying framer and all streams.
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/protocol/http2/connection.rb', line 99 def close(error = nil) # The underlying socket may already be closed by this point. @streams.each_value{|stream| stream.close(error)} @streams.clear if @framer @framer.close @framer = nil end end |
#close! ⇒ Object
Transition the connection into the closed state.
180 181 182 183 184 |
# File 'lib/protocol/http2/connection.rb', line 180 def close! @state = :closed return self end |
#closed? ⇒ Boolean
Whether the connection is effectively or actually closed.
89 90 91 |
# File 'lib/protocol/http2/connection.rb', line 89 def closed? @state == :closed || @framer.nil? end |
#closed_stream_id?(id) ⇒ Boolean
This is only valid if the stream doesn’t exist in ‘@streams`.
430 431 432 433 434 435 436 437 |
# File 'lib/protocol/http2/connection.rb', line 430 def closed_stream_id?(id) if id.zero? # The connection "stream id" can never be closed: false else !idle_stream_id?(id) end end |
#consume_window(size = self.available_size) ⇒ Object
Traverse active streams in order of priority and allow them to consume the available flow-control window.
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 |
# File 'lib/protocol/http2/connection.rb', line 453 def consume_window(size = self.available_size) # Return if there is no window to consume: return unless size > 0 # Console.logger.debug(self) do |buffer| # @dependencies.each do |id, dependency| # buffer.puts "- #{dependency}" # end # # buffer.puts # # @dependency.print_hierarchy(buffer) # end @dependency.consume_window(size) end |
#create_push_promise_stream(&block) ⇒ Object
353 354 355 |
# File 'lib/protocol/http2/connection.rb', line 353 def create_push_promise_stream(&block) create_stream(&block) end |
#create_stream(id = next_stream_id, &block) ⇒ Stream
Create a stream, defaults to an outgoing stream. On the client side, we create requests.
341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/protocol/http2/connection.rb', line 341 def create_stream(id = next_stream_id, &block) if @streams.key?(id) raise ProtocolError, "Cannot create stream with id #{id}, already exists!" end if block_given? return yield(self, id) else return Stream.create(self, id) end end |
#decode_headers(data) ⇒ Object
114 115 116 |
# File 'lib/protocol/http2/connection.rb', line 114 def decode_headers(data) HPACK::Decompressor.new(data, @decoder, table_size_limit: @local_settings.header_table_size).decode end |
#delete(id) ⇒ Object
93 94 95 96 |
# File 'lib/protocol/http2/connection.rb', line 93 def delete(id) @streams.delete(id) @dependencies[id]&.delete! end |
#encode_headers(headers, buffer = String.new.b) ⇒ Object
110 111 112 |
# File 'lib/protocol/http2/connection.rb', line 110 def encode_headers(headers, buffer = String.new.b) HPACK::Compressor.new(buffer, @encoder, table_size_limit: @remote_settings.header_table_size).encode(headers) end |
#id ⇒ Object
48 49 50 |
# File 'lib/protocol/http2/connection.rb', line 48 def id 0 end |
#idle_stream_id?(id) ⇒ Boolean
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/protocol/http2/connection.rb', line 411 def idle_stream_id?(id) if id.even? # Server-initiated streams are even. if @local_stream_id.even? id >= @local_stream_id else id > @remote_stream_id end elsif id.odd? # Client-initiated streams are odd. if @local_stream_id.odd? id >= @local_stream_id else id > @remote_stream_id end end end |
#ignore_frame?(frame) ⇒ Boolean
6.8. GOAWAY There is an inherent race condition between an endpoint starting new streams and the remote sending a GOAWAY frame. To deal with this case, the GOAWAY contains the stream identifier of the last peer-initiated stream that was or might be processed on the sending endpoint in this connection. For instance, if the server sends a GOAWAY frame, the identified stream is the highest-numbered stream initiated by the client. Once sent, the sender will ignore frames sent on streams initiated by the receiver if the stream has an identifier higher than the included last stream identifier. Receivers of a GOAWAY frame MUST NOT open additional streams on the connection, although a new connection can be established for new streams.
136 137 138 139 140 141 142 143 |
# File 'lib/protocol/http2/connection.rb', line 136 def ignore_frame?(frame) if self.closed? # puts "ignore_frame? #{frame.stream_id} -> #{valid_remote_stream_id?(frame.stream_id)} > #{@remote_stream_id}" if valid_remote_stream_id?(frame.stream_id) return frame.stream_id > @remote_stream_id end end end |
#maximum_concurrent_streams ⇒ Object
65 66 67 |
# File 'lib/protocol/http2/connection.rb', line 65 def maximum_concurrent_streams @local_settings.maximum_concurrent_streams end |
#maximum_frame_size ⇒ Object
The size of a frame payload is limited by the maximum size that a receiver advertises in the SETTINGS_MAX_FRAME_SIZE setting.
61 62 63 |
# File 'lib/protocol/http2/connection.rb', line 61 def maximum_frame_size @remote_settings.maximum_frame_size end |
#next_stream_id ⇒ Object
Streams are identified with an unsigned 31-bit integer. Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers. A stream identifier of zero (0x0) is used for connection control messages; the stream identifier of zero cannot be used to establish a new stream.
119 120 121 122 123 124 125 |
# File 'lib/protocol/http2/connection.rb', line 119 def next_stream_id id = @local_stream_id @local_stream_id += 2 return id end |
#open! ⇒ Object
260 261 262 263 264 |
# File 'lib/protocol/http2/connection.rb', line 260 def open! @state = :open return self end |
#process_settings(frame) ⇒ Boolean
In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the “open” or “half-closed (remote)” state). When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream flow-control windows that it maintains by the difference between the new value and the old value.
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/protocol/http2/connection.rb', line 237 def process_settings(frame) if frame.acknowledgement? # The remote end has confirmed the settings have been received: changes = @local_settings.acknowledge update_local_settings(changes) return true else # The remote end is updating the settings, we reply with acknowledgement: reply = frame.acknowledge write_frame(reply) changes = frame.unpack @remote_settings.update(changes) update_remote_settings(changes) return false end end |
#read_frame ⇒ Object
Reads one frame from the network and processes. Processing the frame updates the state of the connection and related streams. If the frame triggers an error, e.g. a protocol error, the connection will typically emit a goaway frame and re-raise the exception. You should continue processing frames until the underlying connection is closed.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/protocol/http2/connection.rb', line 146 def read_frame frame = @framer.read_frame(@local_settings.maximum_frame_size) # puts "#{self.class} #{@state} read_frame: class=#{frame.class} stream_id=#{frame.stream_id} flags=#{frame.flags} length=#{frame.length} (remote_stream_id=#{@remote_stream_id})" # puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}" return if ignore_frame?(frame) yield frame if block_given? frame.apply(self) return frame rescue GoawayError => error # Go directly to jail. Do not pass go, do not collect $200. raise rescue ProtocolError => error send_goaway(error.code || PROTOCOL_ERROR, error.) raise rescue HPACK::Error => error send_goaway(COMPRESSION_ERROR, error.) raise end |
#receive_continuation(frame) ⇒ Object
489 490 491 |
# File 'lib/protocol/http2/connection.rb', line 489 def receive_continuation(frame) raise ProtocolError, "Received unexpected continuation: #{frame.class}" end |
#receive_data(frame) ⇒ Object
305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/protocol/http2/connection.rb', line 305 def receive_data(frame) update_local_window(frame) if stream = @streams[frame.stream_id] stream.receive_data(frame) elsif closed_stream_id?(frame.stream_id) # This can occur if one end sent a stream reset, while the other end was sending a data frame. It's mostly harmless. else raise ProtocolError, "Cannot receive data for stream id #{frame.stream_id}" end end |
#receive_frame(frame) ⇒ Object
493 494 495 |
# File 'lib/protocol/http2/connection.rb', line 493 def receive_frame(frame) # ignore. end |
#receive_goaway(frame) ⇒ Object
196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/protocol/http2/connection.rb', line 196 def receive_goaway(frame) # We capture the last stream that was processed. @remote_stream_id, error_code, = frame.unpack self.close! if error_code != 0 # Shut down immediately. raise GoawayError.new(, error_code) end end |
#receive_headers(frame) ⇒ Object
On the server side, starts a new request.
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 |
# File 'lib/protocol/http2/connection.rb', line 358 def receive_headers(frame) stream_id = frame.stream_id if stream_id.zero? raise ProtocolError, "Cannot receive headers for stream 0!" end if stream = @streams[stream_id] stream.receive_headers(frame) else if stream_id <= @remote_stream_id raise ProtocolError, "Invalid stream id: #{stream_id} <= #{@remote_stream_id}!" end if @streams.size < self.maximum_concurrent_streams stream = accept_stream(stream_id) @remote_stream_id = stream_id stream.receive_headers(frame) else raise ProtocolError, "Exceeded maximum concurrent streams" end end end |
#receive_ping(frame) ⇒ Object
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/protocol/http2/connection.rb', line 288 def receive_ping(frame) if @state != :closed # This is handled in `read_payload`: # if frame.stream_id != 0 # raise ProtocolError, "Ping received for non-zero stream!" # end unless frame.acknowledgement? reply = frame.acknowledge write_frame(reply) end else raise ProtocolError, "Cannot receive ping in state #{@state}" end end |
#receive_priority(frame) ⇒ Object
Sets the priority for an incoming stream.
391 392 393 394 395 396 397 |
# File 'lib/protocol/http2/connection.rb', line 391 def receive_priority(frame) if dependency = @dependencies[frame.stream_id] dependency.receive_priority(frame) elsif idle_stream_id?(frame.stream_id) Dependency.create(self, frame.stream_id, frame.unpack) end end |
#receive_push_promise(frame) ⇒ Object
399 400 401 |
# File 'lib/protocol/http2/connection.rb', line 399 def receive_push_promise(frame) raise ProtocolError, "Unable to receive push promise!" end |
#receive_reset_stream(frame) ⇒ Object
439 440 441 442 443 444 445 446 447 448 449 |
# File 'lib/protocol/http2/connection.rb', line 439 def receive_reset_stream(frame) if frame.connection? raise ProtocolError, "Cannot reset connection!" elsif stream = @streams[frame.stream_id] stream.receive_reset_stream(frame) elsif closed_stream_id?(frame.stream_id) # Ignore. else raise StreamClosed, "Cannot reset stream #{frame.stream_id}" end end |
#receive_settings(frame) ⇒ Object
266 267 268 269 270 271 272 273 274 275 |
# File 'lib/protocol/http2/connection.rb', line 266 def receive_settings(frame) if @state == :new # We transition to :open when we receive acknowledgement of first settings frame: open! if process_settings(frame) elsif @state != :closed process_settings(frame) else raise ProtocolError, "Cannot receive settings in state #{@state}" end end |
#receive_window_update(frame) ⇒ Object
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 |
# File 'lib/protocol/http2/connection.rb', line 470 def receive_window_update(frame) if frame.connection? super self.consume_window elsif stream = @streams[frame.stream_id] begin stream.receive_window_update(frame) rescue ProtocolError => error stream.send_reset_stream(error.code) end elsif closed_stream_id?(frame.stream_id) # Ignore. else # Receiving any frame other than HEADERS or PRIORITY on a stream in this state (idle) MUST be treated as a connection error of type PROTOCOL_ERROR. raise ProtocolError, "Cannot update window of idle stream #{frame.stream_id}" end end |
#send_goaway(error_code = 0, message = "") ⇒ Object
Tell the remote end that the connection is being shut down. If the error_code is 0, this is a graceful shutdown. The other end of the connection should not make any new streams, but existing streams may be completed.
187 188 189 190 191 192 193 194 |
# File 'lib/protocol/http2/connection.rb', line 187 def send_goaway(error_code = 0, = "") frame = GoawayFrame.new frame.pack @remote_stream_id, error_code, write_frame(frame) ensure self.close! end |
#send_ping(data) ⇒ Object
277 278 279 280 281 282 283 284 285 286 |
# File 'lib/protocol/http2/connection.rb', line 277 def send_ping(data) if @state != :closed frame = PingFrame.new frame.pack data write_frame(frame) else raise ProtocolError, "Cannot send ping in state #{@state}" end end |
#send_priority(stream_id, priority) ⇒ Object
383 384 385 386 387 388 |
# File 'lib/protocol/http2/connection.rb', line 383 def send_priority(stream_id, priority) frame = PriorityFrame.new(stream_id) frame.pack(priority) write_frame(frame) end |
#send_settings(changes) ⇒ Object
170 171 172 173 174 175 176 177 |
# File 'lib/protocol/http2/connection.rb', line 170 def send_settings(changes) @local_settings.append(changes) frame = SettingsFrame.new frame.pack(changes) write_frame(frame) end |
#server_stream_id?(id) ⇒ Boolean
407 408 409 |
# File 'lib/protocol/http2/connection.rb', line 407 def server_stream_id?(id) id.even? end |
#update_local_settings(changes) ⇒ Object
216 217 218 219 220 221 222 223 224 |
# File 'lib/protocol/http2/connection.rb', line 216 def update_local_settings(changes) capacity = @local_settings.initial_window_size @streams.each_value do |stream| stream.local_window.capacity = capacity end @local_window.desired = capacity end |
#update_remote_settings(changes) ⇒ Object
226 227 228 229 230 231 232 |
# File 'lib/protocol/http2/connection.rb', line 226 def update_remote_settings(changes) capacity = @remote_settings.initial_window_size @streams.each_value do |stream| stream.remote_window.capacity = capacity end end |
#valid_remote_stream_id?(stream_id) ⇒ Boolean
317 318 319 |
# File 'lib/protocol/http2/connection.rb', line 317 def valid_remote_stream_id?(stream_id) false end |
#write_frame(frame) ⇒ Object
208 209 210 |
# File 'lib/protocol/http2/connection.rb', line 208 def write_frame(frame) @framer.write_frame(frame) end |
#write_frames {|@framer| ... } ⇒ Object
212 213 214 |
# File 'lib/protocol/http2/connection.rb', line 212 def write_frames yield @framer end |