Class: HTTP2Next::Stream

Inherits:
Object
  • Object
show all
Includes:
Emitter, Error, FlowBuffer
Defined in:
lib/http/2/next/stream.rb

Overview

A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.

This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.

                      +--------+
                 PP   |        |   PP
             ,--------|  idle  |--------.
            /         |        |         \
           v          +--------+          v
    +----------+          |           +----------+
    |          |          | H         |          |
,---|:reserved |          |           |:reserved |---.
|   | (local)  |          v           | (remote) |   |
|   +----------+      +--------+      +----------+   |
|      | :active      |        |      :active |      |
|      |      ,-------|:active |-------.      |      |
|      | H   /   ES   |        |   ES   \   H |      |
|      v    v         +--------+         v    v      |
|   +-----------+          |          +-----------+  |
|   |:half_close|          |          |:half_close|  |
|   |  (remote) |          |          |  (local)  |  |
|   +-----------+          |          +-----------+  |
|        |                 v                |        |
|        |    ES/R    +--------+    ES/R    |        |
|        `----------->|        |<-----------'        |
| R                   | :close |                   R |
`-------------------->|        |<--------------------'
                      +--------+

Constant Summary

Constants included from FlowBuffer

FlowBuffer::MAX_WINDOW_SIZE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Emitter

#emit, #on, #once

Methods included from FlowBuffer

#buffered_amount, #flush

Constructor Details

#initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle) ⇒ Stream

Initializes new stream.

Note that you should never have to call this directly. To create a new client initiated stream, use Connection#new_stream. Similarly, Connection will emit new stream objects, when new stream frames are received.

Parameters:

  • id (Integer)
  • weight (Integer) (defaults to: 16)
  • dependency (Integer) (defaults to: 0)
  • exclusive (Boolean) (defaults to: false)
  • window (Integer)
  • parent (Stream) (defaults to: nil)
  • state (Symbol) (defaults to: :idle)


76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/http/2/next/stream.rb', line 76

def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle)
  stream_error(:protocol_error, msg: "stream can't depend on itself") if id == dependency

  @connection = connection
  @id = id
  @weight = weight
  @dependency = dependency
  process_priority(weight: weight, dependency: dependency, exclusive: exclusive)
  @local_window_max_size = connection.local_settings[:settings_initial_window_size]
  @local_window = connection.local_settings[:settings_initial_window_size]
  @remote_window = connection.remote_settings[:settings_initial_window_size]
  @parent = parent
  @state  = state
  @error  = false
  @closed = false
  @_method = @_content_length = @_status_code = nil
  @_waiting_on_trailers = false
  @received_data = false

  on(:window) { |v| @remote_window = v }
  on(:local_window) { |v| @local_window_max_size = @local_window = v }
end

Instance Attribute Details

#closedObject (readonly)

Reason why connection was closed.



61
62
63
# File 'lib/http/2/next/stream.rb', line 61

def closed
  @closed
end

#dependencyObject (readonly)

Returns the value of attribute dependency.



54
55
56
# File 'lib/http/2/next/stream.rb', line 54

def dependency
  @dependency
end

#idObject (readonly)

Stream ID (odd for client initiated streams, even otherwise).



44
45
46
# File 'lib/http/2/next/stream.rb', line 44

def id
  @id
end

#local_windowObject (readonly) Also known as: window

Size of current stream flow control window.



57
58
59
# File 'lib/http/2/next/stream.rb', line 57

def local_window
  @local_window
end

#parentObject (readonly)

Request parent stream of push stream.



50
51
52
# File 'lib/http/2/next/stream.rb', line 50

def parent
  @parent
end

#remote_windowObject (readonly)

Returns the value of attribute remote_window.



54
55
56
# File 'lib/http/2/next/stream.rb', line 54

def remote_window
  @remote_window
end

#stateObject (readonly)

Stream state as defined by HTTP 2.0.



47
48
49
# File 'lib/http/2/next/stream.rb', line 47

def state
  @state
end

#weightObject (readonly)

Stream priority as set by initiator.



53
54
55
# File 'lib/http/2/next/stream.rb', line 53

def weight
  @weight
end

Instance Method Details

#calculate_content_length(data_length) ⇒ Object



180
181
182
183
184
185
186
187
# File 'lib/http/2/next/stream.rb', line 180

def calculate_content_length(data_length)
  return unless @_content_length && data_length

  @_content_length -= data_length
  return if @_content_length >= 0

  stream_error(:protocol_error, msg: "received more data than what was defined in content-length")
end

#cancelObject

Sends a RST_STREAM indicating that the stream is no longer needed.



285
286
287
# File 'lib/http/2/next/stream.rb', line 285

def cancel
  send(type: :rst_stream, error: :cancel)
end

#chunk_data(payload, max_size) ⇒ Object

Chunk data into max_size, yield each chunk, then return final chunk



266
267
268
269
270
271
272
273
274
# File 'lib/http/2/next/stream.rb', line 266

def chunk_data(payload, max_size)
  total = payload.bytesize
  cursor = 0
  while (total - cursor) > max_size
    yield payload.byteslice(cursor, max_size)
    cursor += max_size
  end
  payload.byteslice(cursor, total - cursor)
end

#close(error = :stream_closed) ⇒ Object

Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.

Parameters:

  • error (:Symbol) (defaults to: :stream_closed)

    optional reason why stream was closed



280
281
282
# File 'lib/http/2/next/stream.rb', line 280

def close(error = :stream_closed)
  send(type: :rst_stream, error: error)
end

#closed?Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/http/2/next/stream.rb', line 99

def closed?
  @state == :closed
end

#data(payload, end_stream: true) ⇒ Object

Sends DATA frame containing response payload.

Parameters:

  • payload (String)
  • end_stream (Boolean) (defaults to: true)

    indicates last response DATA frame



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/http/2/next/stream.rb', line 248

def data(payload, end_stream: true)
  # Split data according to each frame is smaller enough
  # TODO: consider padding?
  max_size = @connection.remote_settings[:settings_max_frame_size]

  if payload.bytesize > max_size
    payload = chunk_data(payload, max_size) do |chunk|
      send(type: :data, flags: [], payload: chunk)
    end
  end

  flags = []
  flags << :end_stream if end_stream
  send(type: :data, flags: flags, payload: payload)
end

#headers(headers, end_headers: true, end_stream: false) ⇒ Object

Sends a HEADERS frame containing HTTP response headers. All pseudo-header fields MUST appear in the header block before regular header fields.

Parameters:

  • headers (Array or Hash)

    Array of key-value pairs or Hash

  • end_headers (Boolean) (defaults to: true)

    indicates that no more headers will be sent

  • end_stream (Boolean) (defaults to: false)

    indicates that no payload will be sent



219
220
221
222
223
224
225
# File 'lib/http/2/next/stream.rb', line 219

def headers(headers, end_headers: true, end_stream: false)
  flags = []
  flags << :end_headers if end_headers
  flags << :end_stream  if end_stream || @_method == "HEAD"

  send(type: :headers, flags: flags, payload: headers)
end

#promise(headers, end_headers: true, &block) ⇒ Object

Raises:

  • (ArgumentError)


227
228
229
230
231
232
# File 'lib/http/2/next/stream.rb', line 227

def promise(headers, end_headers: true, &block)
  raise ArgumentError, "must provide callback" unless block_given?

  flags = end_headers ? [:end_headers] : []
  emit(:promise, self, headers, flags, &block)
end

#receive(frame) ⇒ Object Also known as: <<

Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.

Parameters:

  • frame (Hash)


106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/http/2/next/stream.rb', line 106

def receive(frame)
  transition(frame, false)

  case frame[:type]
  when :data
    # 6.1. DATA
    # If a DATA frame is received whose stream is not in "open" or
    # "half closed (local)" state, the recipient MUST respond with a
    # stream error (Section 5.4.2) of type STREAM_CLOSED.
    stream_error(:stream_closed) unless @state == :open ||
                                        @state == :half_closed_local ||
                                        @state == :half_closing || @state == :closing ||
                                        (@state == :closed && @closed == :local_rst)
    @received_data = true
    calculate_content_length(frame[:length])
    update_local_window(frame)
    # Emit DATA frame
    emit(:data, frame[:payload]) unless frame[:ignore]
    calculate_window_update(@local_window_max_size)
  when :headers
    stream_error(:stream_closed) if (@state == :closed && @closed != :local_rst) ||
                                    @state == :remote_closed
    @_method ||= frame[:method]
    @_status_code ||= frame[:status]
    @_content_length ||= frame[:content_length]
    @_trailers ||= frame[:trailer]
    if @_waiting_on_trailers
      verify_trailers(frame)
    elsif @received_data &&
          (!@_status_code || @_status_code >= 200)

      # An endpoint that receives a HEADERS frame without the END_STREAM flag set after receiving a final
      # (non-informational) status code MUST treat the corresponding request or response as malformed.
      verify_trailers(frame)
    end
    emit(:headers, frame[:payload]) unless frame[:ignore]
    @_waiting_on_trailers = !@_trailers.nil?
  when :push_promise
    emit(:promise_headers, frame[:payload]) unless frame[:ignore]
  when :continuation
    stream_error(:stream_closed) if (@state == :closed && @closed != :local_rst) || @state == :remote_closed
    stream_error(:protocol_error) if @received_data
  when :priority
    process_priority(frame)
  when :window_update
    process_window_update(frame: frame)
  when :altsvc
    # 4.  The ALTSVC HTTP/2 Frame
    # An ALTSVC frame on a
    # stream other than stream 0 containing non-empty "Origin" information
    # is invalid and MUST be ignored.
    emit(frame[:type], frame) if !frame[:origin] || frame[:origin].empty?
  when :blocked
    emit(frame[:type], frame)
  end

  complete_transition(frame)
end

#refuseObject

Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.



291
292
293
# File 'lib/http/2/next/stream.rb', line 291

def refuse
  send(type: :rst_stream, error: :refused_stream)
end

#reprioritize(weight: 16, dependency: 0, exclusive: false) ⇒ Object

Sends a PRIORITY frame with new stream priority value (can only be performed by the client).

Parameters:

  • weight (Integer) (defaults to: 16)

    new stream weight value

  • dependency (Integer) (defaults to: 0)

    new stream dependency stream



239
240
241
242
# File 'lib/http/2/next/stream.rb', line 239

def reprioritize(weight: 16, dependency: 0, exclusive: false)
  stream_error if @id.even?
  send(type: :priority, weight: weight, dependency: dependency, exclusive: exclusive)
end

#send(frame) ⇒ Object

Processes outgoing HTTP 2.0 frames. Data frames may be automatically split and buffered based on maximum frame size and current stream flow control window size.

Parameters:

  • frame (Hash)


194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/http/2/next/stream.rb', line 194

def send(frame)
  process_priority(frame) if frame[:type] == :priority

  case frame[:type]
  when :data
    # @remote_window is maintained in send_data
    send_data(frame)
  when :window_update
    manage_state(frame) do
      @local_window += frame[:increment]
      emit(:frame, frame)
    end
  else
    manage_state(frame) do
      emit(:frame, frame)
    end
  end
end

#verify_trailers(frame) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/http/2/next/stream.rb', line 166

def verify_trailers(frame)
  stream_error(:protocol_error, msg: "trailer headers frame must close the stream") unless end_stream?(frame)
  return unless @_trailers

  trailers = frame[:payload]
  return unless trailers.respond_to?(:each)

  trailers.each do |field, _|
    @_trailers.delete(field)
    break if @_trailers.empty?
  end
  stream_error(:protocol_error, msg: "didn't receive all expected trailer headers") unless @_trailers.empty?
end

#window_update(increment) ⇒ Object

Sends a WINDOW_UPDATE frame to the peer.

Parameters:

  • increment (Integer)


298
299
300
301
302
303
# File 'lib/http/2/next/stream.rb', line 298

def window_update(increment)
  # emit stream-level WINDOW_UPDATE unless stream is closed
  return if @state == :closed || @state == :remote_closed

  send(type: :window_update, increment: increment)
end