Class: Protocol::HTTP2::Connection
- Inherits:
-
Object
- Object
- Protocol::HTTP2::Connection
- Includes:
- FlowControlled
- Defined in:
- lib/protocol/http2/connection.rb
Instance Attribute Summary collapse
-
#dependencies ⇒ Object
readonly
Returns the value of attribute dependencies.
-
#dependency ⇒ Object
readonly
Returns the value of attribute dependency.
-
#framer ⇒ Object
readonly
Returns the value of attribute framer.
-
#local_settings ⇒ Object
Current settings value for local and peer.
-
#local_window ⇒ Object
readonly
Our window for receiving data.
-
#remote_settings ⇒ Object
Returns the value of attribute remote_settings.
-
#remote_stream_id ⇒ Object
readonly
The highest stream_id that has been successfully accepted by this connection.
-
#remote_window ⇒ Object
readonly
Our window for sending data.
-
#state ⇒ Object
Connection state (:new, :open, :closed).
-
#streams ⇒ Object
readonly
Returns the value of attribute streams.
Instance Method Summary collapse
- #[](id) ⇒ Object
-
#accept_push_promise_stream(stream_id, &block) ⇒ Object
Accept an incoming push promise from the other side of the connection.
-
#accept_stream(stream_id, &block) ⇒ Object
Accept an incoming stream from the other side of the connnection.
- #client_stream_id?(id) ⇒ Boolean
-
#close(error = nil) ⇒ Object
Close the underlying framer and all streams.
-
#close! ⇒ Object
Transition the connection into the closed state.
-
#closed? ⇒ Boolean
Whether the connection is effectively or actually closed.
-
#closed_stream_id?(id) ⇒ Boolean
This is only valid if the stream doesn’t exist in ‘@streams`.
-
#consume_window(size = self.available_size) ⇒ Object
Traverse active streams in order of priority and allow them to consume the available flow-control window.
- #create_push_promise_stream(&block) ⇒ Object
-
#create_stream(id = next_stream_id, &block) ⇒ Stream
Create a stream, defaults to an outgoing stream.
- #decode_headers(data) ⇒ Object
- #delete(id) ⇒ Object
- #encode_headers(headers, buffer = String.new.b) ⇒ Object
- #id ⇒ Object
- #idle_stream_id?(id) ⇒ Boolean
-
#ignore_frame?(frame) ⇒ Boolean
6.8.
-
#initialize(framer, local_stream_id) ⇒ Connection
constructor
A new instance of Connection.
- #maximum_concurrent_streams ⇒ Object
-
#maximum_frame_size ⇒ Object
The size of a frame payload is limited by the maximum size that a receiver advertises in the SETTINGS_MAX_FRAME_SIZE setting.
-
#next_stream_id ⇒ Object
Streams are identified with an unsigned 31-bit integer.
- #open! ⇒ Object
-
#process_settings(frame) ⇒ Boolean
In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the “open” or “half-closed (remote)” state).
-
#read_frame ⇒ Object
Reads one frame from the network and processes.
- #receive_continuation(frame) ⇒ Object
- #receive_data(frame) ⇒ Object
- #receive_frame(frame) ⇒ Object
- #receive_goaway(frame) ⇒ Object
-
#receive_headers(frame) ⇒ Object
On the server side, starts a new request.
- #receive_ping(frame) ⇒ Object
-
#receive_priority(frame) ⇒ Object
Sets the priority for an incoming stream.
- #receive_push_promise(frame) ⇒ Object
- #receive_reset_stream(frame) ⇒ Object
- #receive_settings(frame) ⇒ Object
- #receive_window_update(frame) ⇒ Object
-
#send_goaway(error_code = 0, message = "") ⇒ Object
Tell the remote end that the connection is being shut down.
- #send_ping(data) ⇒ Object
- #send_priority(stream_id, priority) ⇒ Object
- #send_settings(changes) ⇒ Object
- #server_stream_id?(id) ⇒ Boolean
- #update_local_settings(changes) ⇒ Object
- #update_remote_settings(changes) ⇒ Object
- #valid_remote_stream_id?(stream_id) ⇒ Boolean
- #write_frame(frame) ⇒ Object
- #write_frames {|@framer| ... } ⇒ Object
Methods included from FlowControlled
#available_frame_size, #available_size, #consume_local_window, #consume_remote_window, #request_window_update, #send_window_update, #update_local_window, #window_updated
Constructor Details
#initialize(framer, local_stream_id) ⇒ Connection
Returns a new instance of Connection.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/protocol/http2/connection.rb', line 18 def initialize(framer, local_stream_id) super() @state = :new # Hash(Integer, Stream) @streams = {} # Hash(Integer, Dependency) @dependency = Dependency.new(self, 0) @dependencies = {0 => @dependency} @framer = framer # The next stream id to use: @local_stream_id = local_stream_id # The biggest remote stream id seen thus far: @remote_stream_id = 0 @local_settings = PendingSettings.new @remote_settings = Settings.new @decoder = HPACK::Context.new @encoder = HPACK::Context.new @local_window = LocalWindow.new() @remote_window = Window.new end |
Instance Attribute Details
#dependencies ⇒ Object (readonly)
Returns the value of attribute dependencies.
130 131 132 |
# File 'lib/protocol/http2/connection.rb', line 130 def dependencies @dependencies end |
#dependency ⇒ Object (readonly)
Returns the value of attribute dependency.
132 133 134 |
# File 'lib/protocol/http2/connection.rb', line 132 def dependency @dependency end |
#framer ⇒ Object (readonly)
Returns the value of attribute framer.
69 70 71 |
# File 'lib/protocol/http2/connection.rb', line 69 def framer @framer end |
#local_settings ⇒ Object
Current settings value for local and peer
75 76 77 |
# File 'lib/protocol/http2/connection.rb', line 75 def local_settings @local_settings end |
#local_window ⇒ Object (readonly)
Our window for receiving data. When we receive data, it reduces this window. If the window gets too small, we must send a window update.
80 81 82 |
# File 'lib/protocol/http2/connection.rb', line 80 def local_window @local_window end |
#remote_settings ⇒ Object
Returns the value of attribute remote_settings.
76 77 78 |
# File 'lib/protocol/http2/connection.rb', line 76 def remote_settings @remote_settings end |
#remote_stream_id ⇒ Object (readonly)
The highest stream_id that has been successfully accepted by this connection.
86 87 88 |
# File 'lib/protocol/http2/connection.rb', line 86 def remote_stream_id @remote_stream_id end |
#remote_window ⇒ Object (readonly)
Our window for sending data. When we send data, it reduces this window.
83 84 85 |
# File 'lib/protocol/http2/connection.rb', line 83 def remote_window @remote_window end |
#state ⇒ Object
Connection state (:new, :open, :closed).
72 73 74 |
# File 'lib/protocol/http2/connection.rb', line 72 def state @state end |
#streams ⇒ Object (readonly)
Returns the value of attribute streams.
128 129 130 |
# File 'lib/protocol/http2/connection.rb', line 128 def streams @streams end |
Instance Method Details
#[](id) ⇒ Object
52 53 54 55 56 57 58 |
# File 'lib/protocol/http2/connection.rb', line 52 def [] id if id.zero? self else @streams[id] end end |
#accept_push_promise_stream(stream_id, &block) ⇒ Object
Accept an incoming push promise from the other side of the connection. On the client side, we accept push promise streams. On the server side, existing streams create push promise streams.
335 336 337 |
# File 'lib/protocol/http2/connection.rb', line 335 def accept_push_promise_stream(stream_id, &block) accept_stream(stream_id, &block) end |
#accept_stream(stream_id, &block) ⇒ Object
Accept an incoming stream from the other side of the connnection. On the server side, we accept requests.
324 325 326 327 328 329 330 |
# File 'lib/protocol/http2/connection.rb', line 324 def accept_stream(stream_id, &block) unless valid_remote_stream_id?(stream_id) raise ProtocolError, "Invalid stream id: #{stream_id}" end create_stream(stream_id, &block) end |
#client_stream_id?(id) ⇒ Boolean
404 405 406 |
# File 'lib/protocol/http2/connection.rb', line 404 def client_stream_id?(id) id.odd? end |
#close(error = nil) ⇒ Object
Close the underlying framer and all streams.
99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/protocol/http2/connection.rb', line 99 def close(error = nil) # The underlying socket may already be closed by this point. @streams.each_value{|stream| stream.close(error)} @streams.clear ensure if @framer @framer.close @framer = nil end end |
#close! ⇒ Object
Transition the connection into the closed state.
181 182 183 184 185 |
# File 'lib/protocol/http2/connection.rb', line 181 def close! @state = :closed return self end |
#closed? ⇒ Boolean
Whether the connection is effectively or actually closed.
89 90 91 |
# File 'lib/protocol/http2/connection.rb', line 89 def closed? @state == :closed || @framer.nil? end |
#closed_stream_id?(id) ⇒ Boolean
This is only valid if the stream doesn’t exist in ‘@streams`.
431 432 433 434 435 436 437 438 |
# File 'lib/protocol/http2/connection.rb', line 431 def closed_stream_id?(id) if id.zero? # The connection "stream id" can never be closed: false else !idle_stream_id?(id) end end |
#consume_window(size = self.available_size) ⇒ Object
Traverse active streams in order of priority and allow them to consume the available flow-control window.
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 |
# File 'lib/protocol/http2/connection.rb', line 454 def consume_window(size = self.available_size) # Return if there is no window to consume: return unless size > 0 # Console.logger.debug(self) do |buffer| # @dependencies.each do |id, dependency| # buffer.puts "- #{dependency}" # end # # buffer.puts # # @dependency.print_hierarchy(buffer) # end @dependency.consume_window(size) end |
#create_push_promise_stream(&block) ⇒ Object
354 355 356 |
# File 'lib/protocol/http2/connection.rb', line 354 def create_push_promise_stream(&block) create_stream(&block) end |
#create_stream(id = next_stream_id, &block) ⇒ Stream
Create a stream, defaults to an outgoing stream. On the client side, we create requests.
342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/protocol/http2/connection.rb', line 342 def create_stream(id = next_stream_id, &block) if @streams.key?(id) raise ProtocolError, "Cannot create stream with id #{id}, already exists!" end if block_given? return yield(self, id) else return Stream.create(self, id) end end |
#decode_headers(data) ⇒ Object
115 116 117 |
# File 'lib/protocol/http2/connection.rb', line 115 def decode_headers(data) HPACK::Decompressor.new(data, @decoder, table_size_limit: @local_settings.header_table_size).decode end |
#delete(id) ⇒ Object
93 94 95 96 |
# File 'lib/protocol/http2/connection.rb', line 93 def delete(id) @streams.delete(id) @dependencies[id]&.delete! end |
#encode_headers(headers, buffer = String.new.b) ⇒ Object
111 112 113 |
# File 'lib/protocol/http2/connection.rb', line 111 def encode_headers(headers, buffer = String.new.b) HPACK::Compressor.new(buffer, @encoder, table_size_limit: @remote_settings.header_table_size).encode(headers) end |
#id ⇒ Object
48 49 50 |
# File 'lib/protocol/http2/connection.rb', line 48 def id 0 end |
#idle_stream_id?(id) ⇒ Boolean
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 |
# File 'lib/protocol/http2/connection.rb', line 412 def idle_stream_id?(id) if id.even? # Server-initiated streams are even. if @local_stream_id.even? id >= @local_stream_id else id > @remote_stream_id end elsif id.odd? # Client-initiated streams are odd. if @local_stream_id.odd? id >= @local_stream_id else id > @remote_stream_id end end end |
#ignore_frame?(frame) ⇒ Boolean
6.8. GOAWAY There is an inherent race condition between an endpoint starting new streams and the remote sending a GOAWAY frame. To deal with this case, the GOAWAY contains the stream identifier of the last peer-initiated stream that was or might be processed on the sending endpoint in this connection. For instance, if the server sends a GOAWAY frame, the identified stream is the highest-numbered stream initiated by the client. Once sent, the sender will ignore frames sent on streams initiated by the receiver if the stream has an identifier higher than the included last stream identifier. Receivers of a GOAWAY frame MUST NOT open additional streams on the connection, although a new connection can be established for new streams.
137 138 139 140 141 142 143 144 |
# File 'lib/protocol/http2/connection.rb', line 137 def ignore_frame?(frame) if self.closed? # puts "ignore_frame? #{frame.stream_id} -> #{valid_remote_stream_id?(frame.stream_id)} > #{@remote_stream_id}" if valid_remote_stream_id?(frame.stream_id) return frame.stream_id > @remote_stream_id end end end |
#maximum_concurrent_streams ⇒ Object
65 66 67 |
# File 'lib/protocol/http2/connection.rb', line 65 def maximum_concurrent_streams @local_settings.maximum_concurrent_streams end |
#maximum_frame_size ⇒ Object
The size of a frame payload is limited by the maximum size that a receiver advertises in the SETTINGS_MAX_FRAME_SIZE setting.
61 62 63 |
# File 'lib/protocol/http2/connection.rb', line 61 def maximum_frame_size @remote_settings.maximum_frame_size end |
#next_stream_id ⇒ Object
Streams are identified with an unsigned 31-bit integer. Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers. A stream identifier of zero (0x0) is used for connection control messages; the stream identifier of zero cannot be used to establish a new stream.
120 121 122 123 124 125 126 |
# File 'lib/protocol/http2/connection.rb', line 120 def next_stream_id id = @local_stream_id @local_stream_id += 2 return id end |
#open! ⇒ Object
261 262 263 264 265 |
# File 'lib/protocol/http2/connection.rb', line 261 def open! @state = :open return self end |
#process_settings(frame) ⇒ Boolean
In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the “open” or “half-closed (remote)” state). When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream flow-control windows that it maintains by the difference between the new value and the old value.
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/protocol/http2/connection.rb', line 238 def process_settings(frame) if frame.acknowledgement? # The remote end has confirmed the settings have been received: changes = @local_settings.acknowledge update_local_settings(changes) return true else # The remote end is updating the settings, we reply with acknowledgement: reply = frame.acknowledge write_frame(reply) changes = frame.unpack @remote_settings.update(changes) update_remote_settings(changes) return false end end |
#read_frame ⇒ Object
Reads one frame from the network and processes. Processing the frame updates the state of the connection and related streams. If the frame triggers an error, e.g. a protocol error, the connection will typically emit a goaway frame and re-raise the exception. You should continue processing frames until the underlying connection is closed.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/protocol/http2/connection.rb', line 147 def read_frame frame = @framer.read_frame(@local_settings.maximum_frame_size) # puts "#{self.class} #{@state} read_frame: class=#{frame.class} stream_id=#{frame.stream_id} flags=#{frame.flags} length=#{frame.length} (remote_stream_id=#{@remote_stream_id})" # puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}" return if ignore_frame?(frame) yield frame if block_given? frame.apply(self) return frame rescue GoawayError => error # Go directly to jail. Do not pass go, do not collect $200. raise rescue ProtocolError => error send_goaway(error.code || PROTOCOL_ERROR, error.) raise rescue HPACK::Error => error send_goaway(COMPRESSION_ERROR, error.) raise end |
#receive_continuation(frame) ⇒ Object
490 491 492 |
# File 'lib/protocol/http2/connection.rb', line 490 def receive_continuation(frame) raise ProtocolError, "Received unexpected continuation: #{frame.class}" end |
#receive_data(frame) ⇒ Object
306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/protocol/http2/connection.rb', line 306 def receive_data(frame) update_local_window(frame) if stream = @streams[frame.stream_id] stream.receive_data(frame) elsif closed_stream_id?(frame.stream_id) # This can occur if one end sent a stream reset, while the other end was sending a data frame. It's mostly harmless. else raise ProtocolError, "Cannot receive data for stream id #{frame.stream_id}" end end |
#receive_frame(frame) ⇒ Object
494 495 496 |
# File 'lib/protocol/http2/connection.rb', line 494 def receive_frame(frame) # ignore. end |
#receive_goaway(frame) ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/protocol/http2/connection.rb', line 197 def receive_goaway(frame) # We capture the last stream that was processed. @remote_stream_id, error_code, = frame.unpack self.close! if error_code != 0 # Shut down immediately. raise GoawayError.new(, error_code) end end |
#receive_headers(frame) ⇒ Object
On the server side, starts a new request.
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 |
# File 'lib/protocol/http2/connection.rb', line 359 def receive_headers(frame) stream_id = frame.stream_id if stream_id.zero? raise ProtocolError, "Cannot receive headers for stream 0!" end if stream = @streams[stream_id] stream.receive_headers(frame) else if stream_id <= @remote_stream_id raise ProtocolError, "Invalid stream id: #{stream_id} <= #{@remote_stream_id}!" end if @streams.size < self.maximum_concurrent_streams stream = accept_stream(stream_id) @remote_stream_id = stream_id stream.receive_headers(frame) else raise ProtocolError, "Exceeded maximum concurrent streams" end end end |
#receive_ping(frame) ⇒ Object
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/protocol/http2/connection.rb', line 289 def receive_ping(frame) if @state != :closed # This is handled in `read_payload`: # if frame.stream_id != 0 # raise ProtocolError, "Ping received for non-zero stream!" # end unless frame.acknowledgement? reply = frame.acknowledge write_frame(reply) end else raise ProtocolError, "Cannot receive ping in state #{@state}" end end |
#receive_priority(frame) ⇒ Object
Sets the priority for an incoming stream.
392 393 394 395 396 397 398 |
# File 'lib/protocol/http2/connection.rb', line 392 def receive_priority(frame) if dependency = @dependencies[frame.stream_id] dependency.receive_priority(frame) elsif idle_stream_id?(frame.stream_id) Dependency.create(self, frame.stream_id, frame.unpack) end end |
#receive_push_promise(frame) ⇒ Object
400 401 402 |
# File 'lib/protocol/http2/connection.rb', line 400 def receive_push_promise(frame) raise ProtocolError, "Unable to receive push promise!" end |
#receive_reset_stream(frame) ⇒ Object
440 441 442 443 444 445 446 447 448 449 450 |
# File 'lib/protocol/http2/connection.rb', line 440 def receive_reset_stream(frame) if frame.connection? raise ProtocolError, "Cannot reset connection!" elsif stream = @streams[frame.stream_id] stream.receive_reset_stream(frame) elsif closed_stream_id?(frame.stream_id) # Ignore. else raise StreamClosed, "Cannot reset stream #{frame.stream_id}" end end |
#receive_settings(frame) ⇒ Object
267 268 269 270 271 272 273 274 275 276 |
# File 'lib/protocol/http2/connection.rb', line 267 def receive_settings(frame) if @state == :new # We transition to :open when we receive acknowledgement of first settings frame: open! if process_settings(frame) elsif @state != :closed process_settings(frame) else raise ProtocolError, "Cannot receive settings in state #{@state}" end end |
#receive_window_update(frame) ⇒ Object
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 |
# File 'lib/protocol/http2/connection.rb', line 471 def receive_window_update(frame) if frame.connection? super self.consume_window elsif stream = @streams[frame.stream_id] begin stream.receive_window_update(frame) rescue ProtocolError => error stream.send_reset_stream(error.code) end elsif closed_stream_id?(frame.stream_id) # Ignore. else # Receiving any frame other than HEADERS or PRIORITY on a stream in this state (idle) MUST be treated as a connection error of type PROTOCOL_ERROR. raise ProtocolError, "Cannot update window of idle stream #{frame.stream_id}" end end |
#send_goaway(error_code = 0, message = "") ⇒ Object
Tell the remote end that the connection is being shut down. If the ‘error_code` is 0, this is a graceful shutdown. The other end of the connection should not make any new streams, but existing streams may be completed.
188 189 190 191 192 193 194 195 |
# File 'lib/protocol/http2/connection.rb', line 188 def send_goaway(error_code = 0, = "") frame = GoawayFrame.new frame.pack @remote_stream_id, error_code, write_frame(frame) ensure self.close! end |
#send_ping(data) ⇒ Object
278 279 280 281 282 283 284 285 286 287 |
# File 'lib/protocol/http2/connection.rb', line 278 def send_ping(data) if @state != :closed frame = PingFrame.new frame.pack data write_frame(frame) else raise ProtocolError, "Cannot send ping in state #{@state}" end end |
#send_priority(stream_id, priority) ⇒ Object
384 385 386 387 388 389 |
# File 'lib/protocol/http2/connection.rb', line 384 def send_priority(stream_id, priority) frame = PriorityFrame.new(stream_id) frame.pack(priority) write_frame(frame) end |
#send_settings(changes) ⇒ Object
171 172 173 174 175 176 177 178 |
# File 'lib/protocol/http2/connection.rb', line 171 def send_settings(changes) @local_settings.append(changes) frame = SettingsFrame.new frame.pack(changes) write_frame(frame) end |
#server_stream_id?(id) ⇒ Boolean
408 409 410 |
# File 'lib/protocol/http2/connection.rb', line 408 def server_stream_id?(id) id.even? end |
#update_local_settings(changes) ⇒ Object
217 218 219 220 221 222 223 224 225 |
# File 'lib/protocol/http2/connection.rb', line 217 def update_local_settings(changes) capacity = @local_settings.initial_window_size @streams.each_value do |stream| stream.local_window.capacity = capacity end @local_window.desired = capacity end |
#update_remote_settings(changes) ⇒ Object
227 228 229 230 231 232 233 |
# File 'lib/protocol/http2/connection.rb', line 227 def update_remote_settings(changes) capacity = @remote_settings.initial_window_size @streams.each_value do |stream| stream.remote_window.capacity = capacity end end |
#valid_remote_stream_id?(stream_id) ⇒ Boolean
318 319 320 |
# File 'lib/protocol/http2/connection.rb', line 318 def valid_remote_stream_id?(stream_id) false end |
#write_frame(frame) ⇒ Object
209 210 211 |
# File 'lib/protocol/http2/connection.rb', line 209 def write_frame(frame) @framer.write_frame(frame) end |
#write_frames {|@framer| ... } ⇒ Object
213 214 215 |
# File 'lib/protocol/http2/connection.rb', line 213 def write_frames yield @framer end |