Class: HTTP2::Stream

Inherits:
Object
  • Object
show all
Includes:
Emitter, Error, FlowBuffer
Defined in:
lib/http/2/stream.rb

Overview

A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.

This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.

                      +--------+
                 PP   |        |   PP
             ,--------|  idle  |--------.
            /         |        |         \
           v          +--------+          v
    +----------+          |           +----------+
    |          |          | H         |          |
,---|:reserved |          |           |:reserved |---.
|   | (local)  |          v           | (remote) |   |
|   +----------+      +--------+      +----------+   |
|      | :active      |        |      :active |      |
|      |      ,-------|:active |-------.      |      |
|      | H   /   ES   |        |   ES   \   H |      |
|      v    v         +--------+         v    v      |
|   +-----------+          |          +-----------+  |
|   |:half_close|          |          |:half_close|  |
|   |  (remote) |          |          |  (local)  |  |
|   +-----------+          |          +-----------+  |
|        |                 v                |        |
|        |    ES/R    +--------+    ES/R    |        |
|        `----------->|        |<-----------'        |
| R                   | :close |                   R |
`-------------------->|        |<--------------------'
                      +--------+

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Emitter

#add_listener, #emit, #once

Methods included from FlowBuffer

#buffered_amount

Constructor Details

#initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle) ⇒ Stream

Initializes new stream.

Note that you should never have to call this directly. To create a new client initiated stream, use Connection#new_stream. Similarly, Connection will emit new stream objects, when new stream frames are received.

Parameters:

  • id (Integer)
  • weight (Integer) (defaults to: 16)
  • dependency (Integer) (defaults to: 0)
  • exclusive (Boolean) (defaults to: false)
  • window (Integer)
  • parent (Stream) (defaults to: nil)
  • state (Symbol) (defaults to: :idle)


76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/http/2/stream.rb', line 76

def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil, state: :idle)
  @connection = connection
  @id = id
  @weight = weight
  @dependency = dependency
  process_priority(weight: weight, stream_dependency: dependency, exclusive: exclusive)
  @local_window_max_size = connection.local_settings[:settings_initial_window_size]
  @local_window  = connection.local_settings[:settings_initial_window_size]
  @remote_window = connection.remote_settings[:settings_initial_window_size]
  @parent = parent
  @state  = state
  @error  = false
  @closed = false
  @send_buffer = []

  on(:window) { |v| @remote_window = v }
  on(:local_window) { |v| @local_window_max_size = @local_window = v }
end

Instance Attribute Details

#closedObject (readonly)

Reason why connection was closed.



61
62
63
# File 'lib/http/2/stream.rb', line 61

def closed
  @closed
end

#dependencyObject (readonly)

Returns the value of attribute dependency.



54
55
56
# File 'lib/http/2/stream.rb', line 54

def dependency
  @dependency
end

#idObject (readonly)

Stream ID (odd for client initiated streams, even otherwise).



44
45
46
# File 'lib/http/2/stream.rb', line 44

def id
  @id
end

#local_windowObject (readonly) Also known as: window

Size of current stream flow control window.



57
58
59
# File 'lib/http/2/stream.rb', line 57

def local_window
  @local_window
end

#parentObject (readonly)

Request parent stream of push stream.



50
51
52
# File 'lib/http/2/stream.rb', line 50

def parent
  @parent
end

#remote_windowObject (readonly)

Returns the value of attribute remote_window.



54
55
56
# File 'lib/http/2/stream.rb', line 54

def remote_window
  @remote_window
end

#stateObject (readonly)

Stream state as defined by HTTP 2.0.



47
48
49
# File 'lib/http/2/stream.rb', line 47

def state
  @state
end

#weightObject (readonly)

Stream priority as set by initiator.



53
54
55
# File 'lib/http/2/stream.rb', line 53

def weight
  @weight
end

Instance Method Details

#cancelObject

Sends a RST_STREAM indicating that the stream is no longer needed.



229
230
231
# File 'lib/http/2/stream.rb', line 229

def cancel
  send(type: :rst_stream, error: :cancel)
end

#chunk_data(payload, max_size) ⇒ Object

Chunk data into max_size, yield each chunk, then return final chunk



210
211
212
213
214
215
216
217
218
# File 'lib/http/2/stream.rb', line 210

def chunk_data(payload, max_size)
  total = payload.bytesize
  cursor = 0
  while (total - cursor) > max_size
    yield payload.byteslice(cursor, max_size)
    cursor += max_size
  end
  payload.byteslice(cursor, total - cursor)
end

#close(error = :stream_closed) ⇒ Object

Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.

Parameters:

  • error (:Symbol) (defaults to: :stream_closed)

    optional reason why stream was closed



224
225
226
# File 'lib/http/2/stream.rb', line 224

def close(error = :stream_closed)
  send(type: :rst_stream, error: error)
end

#closed?Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/http/2/stream.rb', line 95

def closed?
  @state == :closed
end

#data(payload, end_stream: true) ⇒ Object

Sends DATA frame containing response payload.

Parameters:

  • payload (String)
  • end_stream (Boolean) (defaults to: true)

    indicates last response DATA frame



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/http/2/stream.rb', line 192

def data(payload, end_stream: true)
  # Split data according to each frame is smaller enough
  # TODO: consider padding?
  max_size = @connection.remote_settings[:settings_max_frame_size]

  if payload.bytesize > max_size
    payload = chunk_data(payload, max_size) do |chunk|
      send(type: :data, flags: [], payload: chunk)
    end
  end

  flags = []
  flags << :end_stream if end_stream
  send(type: :data, flags: flags, payload: payload)
end

#headers(headers, end_headers: true, end_stream: false) ⇒ Object

Sends a HEADERS frame containing HTTP response headers. All pseudo-header fields MUST appear in the header block before regular header fields.

Parameters:

  • headers (Array or Hash)

    Array of key-value pairs or Hash

  • end_headers (Boolean) (defaults to: true)

    indicates that no more headers will be sent

  • end_stream (Boolean) (defaults to: false)

    indicates that no payload will be sent



163
164
165
166
167
168
169
# File 'lib/http/2/stream.rb', line 163

def headers(headers, end_headers: true, end_stream: false)
  flags = []
  flags << :end_headers if end_headers
  flags << :end_stream  if end_stream

  send(type: :headers, flags: flags, payload: headers)
end

#promise(headers, end_headers: true, &block) ⇒ Object

Raises:

  • (ArgumentError)


171
172
173
174
175
176
# File 'lib/http/2/stream.rb', line 171

def promise(headers, end_headers: true, &block)
  raise ArgumentError, 'must provide callback' unless block_given?

  flags = end_headers ? [:end_headers] : []
  emit(:promise, self, headers, flags, &block)
end

#receive(frame) ⇒ Object Also known as: <<

Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.

Parameters:

  • frame (Hash)


102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/http/2/stream.rb', line 102

def receive(frame)
  transition(frame, false)

  case frame[:type]
  when :data
    update_local_window(frame)
    # Emit DATA frame
    emit(:data, frame[:payload]) unless frame[:ignore]
    calculate_window_update(@local_window_max_size)
  when :headers
    emit(:headers, frame[:payload]) unless frame[:ignore]
  when :push_promise
    emit(:promise_headers, frame[:payload]) unless frame[:ignore]
  when :priority
    process_priority(frame)
  when :window_update
    process_window_update(frame)
  when :altsvc
    # 4.  The ALTSVC HTTP/2 Frame
    # An ALTSVC frame on a
    # stream other than stream 0 containing non-empty "Origin" information
    # is invalid and MUST be ignored.
    emit(frame[:type], frame) if !frame[:origin] || frame[:origin].empty?
  when :blocked
    emit(frame[:type], frame)
  end

  complete_transition(frame)
end

#refuseObject

Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.



235
236
237
# File 'lib/http/2/stream.rb', line 235

def refuse
  send(type: :rst_stream, error: :refused_stream)
end

#reprioritize(weight: 16, dependency: 0, exclusive: false) ⇒ Object

Sends a PRIORITY frame with new stream priority value (can only be performed by the client).

Parameters:

  • weight (Integer) (defaults to: 16)

    new stream weight value

  • dependency (Integer) (defaults to: 0)

    new stream dependency stream



183
184
185
186
# File 'lib/http/2/stream.rb', line 183

def reprioritize(weight: 16, dependency: 0, exclusive: false)
  stream_error if @id.even?
  send(type: :priority, weight: weight, stream_dependency: dependency, exclusive: exclusive)
end

#send(frame) ⇒ Object

Processes outgoing HTTP 2.0 frames. Data frames may be automatically split and buffered based on maximum frame size and current stream flow control window size.

Parameters:

  • frame (Hash)


138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/http/2/stream.rb', line 138

def send(frame)
  process_priority(frame) if frame[:type] == :priority

  case frame[:type]
  when :data
    # @remote_window is maintained in send_data
    send_data(frame)
  when :window_update
    manage_state(frame) do
      @local_window += frame[:increment]
      emit(:frame, frame)
    end
  else
    manage_state(frame) do
      emit(:frame, frame)
    end
  end
end

#window_update(increment) ⇒ Object

Sends a WINDOW_UPDATE frame to the peer.

Parameters:

  • increment (Integer)


242
243
244
245
246
247
# File 'lib/http/2/stream.rb', line 242

def window_update(increment)
  # emit stream-level WINDOW_UPDATE unless stream is closed
  return if @state == :closed || @state == :remote_closed

  send(type: :window_update, increment: increment)
end