Class: HTTP2::Connection

Inherits:
Object
  • Object
show all
Includes:
Emitter, Error, FlowBuffer
Defined in:
lib/http/2/connection.rb

Overview

Connection encapsulates all of the connection, stream, flow-control, error management, and other processing logic required for a well-behaved HTTP 2.0 client.

When the connection object is instantiated you must specify its role (:client or :server) to initialize appropriate header compression and decompression algorithms and stream management logic.

Your code is responsible for feeding data to connection object, which performs all of the necessary HTTP 2.0 decoding, state management and the rest, and vice versa, the parser will emit bytes (encoded HTTP 2.0 frames) that you can then route to the destination. Roughly, this works as follows:

Examples:

socket = YourTransport.new

conn = HTTP2::Connection.new(:client)
conn.on(:frame) {|bytes| socket << bytes }

while bytes = socket.read
  conn << bytes
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Emitter

#add_listener, #emit, #once

Methods included from FlowBuffer

#buffered_amount

Constructor Details

#initialize(type = :client) ⇒ Connection

Initializes new client or server connection object.

Parameters:

  • type (Symbol) (defaults to: :client)


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/http/2/connection.rb', line 65

def initialize(type = :client)
  @type = type

  if @type == :server
    @stream_id    = 2
    @compressor   = Header::Compressor.new(:response)
    @decompressor = Header::Decompressor.new(:request)
  else
    @stream_id    = 1
    @compressor   = Header::Compressor.new(:request)
    @decompressor = Header::Decompressor.new(:response)
  end

  @stream_limit = Float::INFINITY
  @active_stream_count = 0
  @streams = {}

  @framer = Framer.new
  @window = DEFAULT_FLOW_WINDOW
  @window_limit = DEFAULT_FLOW_WINDOW

  @recv_buffer = Buffer.new
  @send_buffer = []
  @continuation = []
  @state = :new
  @error = nil
end

Instance Attribute Details

#active_stream_countObject (readonly)

Number of active streams between client and server (reserved streams are not counted towards the stream limit).



60
61
62
# File 'lib/http/2/connection.rb', line 60

def active_stream_count
  @active_stream_count
end

#errorObject (readonly)

Last connection error if connection is aborted.



48
49
50
# File 'lib/http/2/connection.rb', line 48

def error
  @error
end

#stateObject (readonly)

Connection state (:new, :closed).



45
46
47
# File 'lib/http/2/connection.rb', line 45

def state
  @state
end

#stream_limitObject (readonly)

Maximum number of concurrent streams allowed by the peer (automatically updated on receipt of peer settings).



56
57
58
# File 'lib/http/2/connection.rb', line 56

def stream_limit
  @stream_limit
end

#typeObject (readonly)

Type of connection (:server, :client).



42
43
44
# File 'lib/http/2/connection.rb', line 42

def type
  @type
end

#windowObject (readonly)

Size of current connection flow control window (by default, set to infinity, but is automatically updated on receipt of peer settings).



52
53
54
# File 'lib/http/2/connection.rb', line 52

def window
  @window
end

Instance Method Details

#goaway(error = :no_error, payload = nil) ⇒ Object

Sends a GOAWAY frame indicating that the peer should stop creating new streams for current connection.

Endpoints MAY append opaque data to the payload of any GOAWAY frame. Additional debug data is intended for diagnostic purposes only and carries no semantic value. Debug data MUST NOT be persistently stored, since it could contain sensitive information.

Parameters:

  • error (Symbol) (defaults to: :no_error)
  • payload (String) (defaults to: nil)


127
128
129
130
131
132
133
# File 'lib/http/2/connection.rb', line 127

def goaway(error = :no_error, payload = nil)
  send({
    type: :goaway, last_stream: (@streams.max.first rescue 0),
    error: error, payload: payload
  })
  @state = :closed
end

#new_stream(priority: DEFAULT_PRIORITY, window: @window_limit, parent: nil) ⇒ Object

Allocates new stream for current connection.

Parameters:

  • priority (Integer) (defaults to: DEFAULT_PRIORITY)
  • window (Integer) (defaults to: @window_limit)
  • parent (Stream) (defaults to: nil)

Raises:



98
99
100
101
102
103
104
105
106
# File 'lib/http/2/connection.rb', line 98

def new_stream(priority: DEFAULT_PRIORITY, window: @window_limit, parent: nil)
  raise ConnectionClosed.new if @state == :closed
  raise StreamLimitExceeded.new if @active_stream_count == @stream_limit

  stream = activate_stream(@stream_id, priority, window, parent)
  @stream_id += 2

  stream
end

#ping(payload, &blk) ⇒ Object

Sends PING frame to the peer.

Parameters:

  • payload (String)

    optional payload must be 8 bytes long

  • blk (Proc)

    callback to execute when PONG is received



112
113
114
115
# File 'lib/http/2/connection.rb', line 112

def ping(payload, &blk)
  send({type: :ping, stream: 0, payload: payload})
  once(:pong, &blk) if blk
end

#receive(data) ⇒ Object Also known as: <<

Decodes incoming bytes into HTTP 2.0 frames and routes them to appropriate receivers: connection frames are handled directly, and stream frames are passed to appropriate stream objects.

Parameters:

  • data (String)

    Binary encoded string



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/http/2/connection.rb', line 150

def receive(data)
  @recv_buffer << data

  while frame = @framer.parse(@recv_buffer) do
    # Header blocks MUST be transmitted as a contiguous sequence of frames
    # with no interleaved frames of any other type, or from any other stream.
    if !@continuation.empty?
      if frame[:type]  != :continuation ||
         frame[:stream] != @continuation.first[:stream]
        connection_error
      end

      @continuation << frame
      return if !frame[:flags].include? :end_headers

      headers = @continuation.collect do |chunk|
        decode_headers(chunk)
        chunk[:payload]
      end.flatten(1)

      frame = @continuation.shift
      @continuation.clear

      frame.delete(:length)
      frame[:payload] = headers
      frame[:flags] << if frame[:type] == :push_promise
        :end_push_promise
      else
        :end_headers
      end
    end

    # SETTINGS frames always apply to a connection, never a single stream.
    # The stream identifier for a settings frame MUST be zero.  If an
    # endpoint receives a SETTINGS frame whose stream identifier field is
    # anything other than 0x0, the endpoint MUST respond with a connection
    # error (Section 5.4.1) of type PROTOCOL_ERROR.
    if connection_frame?(frame)
      connection_management(frame)
    else
      case frame[:type]
      when :headers
        # The last frame in a sequence of HEADERS/CONTINUATION
        # frames MUST have the END_HEADERS flag set.
        if !frame[:flags].include? :end_headers
          @continuation << frame
          return
        end

        # After sending a GOAWAY frame, the sender can discard frames
        # for new streams.  However, any frames that alter connection
        # state cannot be completely ignored.  For instance, HEADERS,
        # PUSH_PROMISE and CONTINUATION frames MUST be minimally
        # processed to ensure a consistent compression state
        decode_headers(frame)
        return if @state == :closed

        stream = @streams[frame[:stream]]
        if stream.nil?
          stream = activate_stream(frame[:stream],
                                   frame[:priority] || DEFAULT_PRIORITY,
                                   @window_limit)
          emit(:stream, stream)
        end

        stream << frame

      when :push_promise
        # The last frame in a sequence of PUSH_PROMISE/CONTINUATION
        # frames MUST have the END_PUSH_PROMISE/END_HEADERS flag set
        if !frame[:flags].include? :end_push_promise
          @continuation << frame
          return
        end

        decode_headers(frame)
        return if @state == :closed

        # PUSH_PROMISE frames MUST be associated with an existing, peer-
        # initiated stream... A receiver MUST treat the receipt of a
        # PUSH_PROMISE on a stream that is neither "open" nor
        # "half-closed (local)" as a connection error (Section 5.4.1) of
        # type PROTOCOL_ERROR. Similarly, a receiver MUST treat the
        # receipt of a PUSH_PROMISE that promises an illegal stream
        # identifier (Section 5.1.1) (that is, an identifier for a stream
        # that is not currently in the "idle" state) as a connection error
        # (Section 5.4.1) of type PROTOCOL_ERROR, unless the receiver
        # recently sent a RST_STREAM frame to cancel the associated stream.
        parent = @streams[frame[:stream]]
        pid = frame[:promise_stream]

        connection_error(msg: 'missing parent ID') if parent.nil?

        if !(parent.state == :open || parent.state == :half_closed_local)
          # An endpoint might receive a PUSH_PROMISE frame after it sends
          # RST_STREAM.  PUSH_PROMISE causes a stream to become "reserved".
          # The RST_STREAM does not cancel any promised stream.  Therefore, if
          # promised streams are not desired, a RST_STREAM can be used to
          # close any of those streams.
          if parent.closed == :local_rst
            # We can either (a) 'resurrect' the parent, or (b) RST_STREAM
            # ... sticking with (b), might need to revisit later.
            send({type: :rst_stream, stream: pid, error: :refused_stream})
          else
            connection_error
          end
        end

        stream = activate_stream(pid, DEFAULT_PRIORITY, @window_limit, parent)
        emit(:promise, stream)
        stream << frame
      else
        if stream = @streams[frame[:stream]]
          stream << frame
        else
          # An endpoint that receives an unexpected stream identifier
          # MUST respond with a connection error of type PROTOCOL_ERROR.
          connection_error
        end
      end
    end
  end
end

#settings(payload) ⇒ Object

Sends a connection SETTINGS frame to the peer.

Parameters:

  • payload (Hash)

Options Hash (payload):

  • :settings_max_concurrent_streams (Symbol)
  • :settings_flow_control_options (Symbol)
  • :settings_initial_window_size (Symbol)


141
142
143
# File 'lib/http/2/connection.rb', line 141

def settings(payload)
  send({type: :settings, stream: 0, payload: payload})
end