Class: HTTP2::Stream

Inherits:
Object
  • Object
show all
Includes:
Emitter, Error, FlowBuffer
Defined in:
lib/http/2/stream.rb

Overview

A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.

This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.

                      +--------+
                 PP   |        |   PP
             ,--------|  idle  |--------.
            /         |        |         \
           v          +--------+          v
    +----------+          |           +----------+
    |          |          | H         |          |
,---|:reserved |          |           |:reserved |---.
|   | (local)  |          v           | (remote) |   |
|   +----------+      +--------+      +----------+   |
|      | :active      |        |      :active |      |
|      |      ,-------|:active |-------.      |      |
|      | H   /   ES   |        |   ES   \   H |      |
|      v    v         +--------+         v    v      |
|   +-----------+          |          +-----------+  |
|   |:half_close|          |          |:half_close|  |
|   |  (remote) |          |          |  (local)  |  |
|   +-----------+          |          +-----------+  |
|        |                 v                |        |
|        |    ES/R    +--------+    ES/R    |        |
|        `----------->|        |<-----------'        |
| R                   | :close |                   R |
`-------------------->|        |<--------------------'
                      +--------+

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Emitter

#add_listener, #emit, #once

Methods included from FlowBuffer

#buffered_amount

Constructor Details

#initialize(connection, id, weight: 16, dependency: 0, exclusive: false, parent: nil) ⇒ Stream

Initializes new stream.

Note that you should never have to call this directly. To create a new client initiated stream, use Connection#new_stream. Similarly, Connection will emit new stream objects, when new stream frames are received.

Parameters:

  • id (Integer)
  • weight (Integer) (defaults to: 16)
  • dependency (Integer) (defaults to: 0)
  • exclusive (Boolean) (defaults to: false)
  • window (Integer)
  • parent (Stream) (defaults to: nil)


74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/http/2/stream.rb', line 74

def initialize(connection, id, weight: 16, dependency: 0, exclusive: false, parent: nil)
  @connection = connection
  @id = id
  @weight = weight
  @dependency = dependency
  process_priority(weight: weight, stream_dependency: dependency, exclusive: exclusive)
  @local_window  = connection.local_settings[:settings_initial_window_size]
  @remote_window = connection.remote_settings[:settings_initial_window_size]
  @parent = parent
  @state  = :idle
  @error  = false
  @closed = false
  @send_buffer = []

  on(:window) { |v| @remote_window = v }
  on(:local_window) { |v| @local_window = v }
end

Instance Attribute Details

#closedObject (readonly)

Reason why connection was closed.



60
61
62
# File 'lib/http/2/stream.rb', line 60

def closed
  @closed
end

#dependencyObject (readonly)

Returns the value of attribute dependency.



52
53
54
# File 'lib/http/2/stream.rb', line 52

def dependency
  @dependency
end

#idObject (readonly)

Stream ID (odd for client initiated streams, even otherwise).



42
43
44
# File 'lib/http/2/stream.rb', line 42

def id
  @id
end

#local_windowObject (readonly) Also known as: window

Size of current stream flow control window.



55
56
57
# File 'lib/http/2/stream.rb', line 55

def local_window
  @local_window
end

#parentObject (readonly)

Request parent stream of push stream.



48
49
50
# File 'lib/http/2/stream.rb', line 48

def parent
  @parent
end

#remote_windowObject (readonly)

Returns the value of attribute remote_window.



56
57
58
# File 'lib/http/2/stream.rb', line 56

def remote_window
  @remote_window
end

#stateObject (readonly)

Stream state as defined by HTTP 2.0.



45
46
47
# File 'lib/http/2/stream.rb', line 45

def state
  @state
end

#weightObject (readonly)

Stream priority as set by initiator.



51
52
53
# File 'lib/http/2/stream.rb', line 51

def weight
  @weight
end

Instance Method Details

#cancelObject

Sends a RST_STREAM indicating that the stream is no longer needed.



198
199
200
# File 'lib/http/2/stream.rb', line 198

def cancel
  send(type: :rst_stream, error: :cancel)
end

#close(error = :stream_closed) ⇒ Object

Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.

Parameters:

  • error (:Symbol) (defaults to: :stream_closed)

    optional reason why stream was closed



193
194
195
# File 'lib/http/2/stream.rb', line 193

def close(error = :stream_closed)
  send(type: :rst_stream, error: error)
end

#data(payload, end_stream: true) ⇒ Object

Sends DATA frame containing response payload.

Parameters:

  • payload (String)
  • end_stream (Boolean) (defaults to: true)

    indicates last response DATA frame



175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/http/2/stream.rb', line 175

def data(payload, end_stream: true)
  # Split data according to each frame is smaller enough
  # TODO: consider padding?
  max_size = @connection.remote_settings[:settings_max_frame_size]
  while payload.bytesize > max_size
    chunk = payload.slice!(0, max_size)
    send(type: :data, flags: [], payload: chunk)
  end

  flags = []
  flags << :end_stream if end_stream
  send(type: :data, flags: flags, payload: payload)
end

#headers(headers, end_headers: true, end_stream: false) ⇒ Object

Sends a HEADERS frame containing HTTP response headers.

Parameters:

  • headers (Array or Hash)

    Array of key-value pairs or Hash

  • end_headers (Boolean) (defaults to: true)

    indicates that no more headers will be sent

  • end_stream (Boolean) (defaults to: false)

    indicates that no payload will be sent



146
147
148
149
150
151
152
# File 'lib/http/2/stream.rb', line 146

def headers(headers, end_headers: true, end_stream: false)
  flags = []
  flags << :end_headers if end_headers
  flags << :end_stream  if end_stream

  send(type: :headers, flags: flags, payload: headers.to_a)
end

#promise(headers, end_headers: true, &block) ⇒ Object



154
155
156
157
158
159
# File 'lib/http/2/stream.rb', line 154

def promise(headers, end_headers: true, &block)
  fail ArgumentError, 'must provide callback' unless block_given?

  flags = end_headers ? [:end_headers] : []
  emit(:promise, self, headers, flags, &block)
end

#receive(frame) ⇒ Object Also known as: <<

Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.

Parameters:

  • frame (Hash)


95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/http/2/stream.rb', line 95

def receive(frame)
  transition(frame, false)

  case frame[:type]
  when :data
    @local_window -= frame[:payload].size
    emit(:data, frame[:payload]) unless frame[:ignore]
  when :headers, :push_promise
    emit(:headers, frame[:payload]) unless frame[:ignore]
  when :priority
    process_priority(frame)
  when :window_update
    process_window_update(frame)
  when :altsvc, :blocked
    emit(frame[:type], frame)
  end

  complete_transition(frame)
end

#refuseObject

Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.



204
205
206
# File 'lib/http/2/stream.rb', line 204

def refuse
  send(type: :rst_stream, error: :refused_stream)
end

#reprioritize(weight: 16, dependency: 0, exclusive: false) ⇒ Object

Sends a PRIORITY frame with new stream priority value (can only be performed by the client).

Parameters:

  • weight (Integer) (defaults to: 16)

    new stream weight value

  • dependency (Integer) (defaults to: 0)

    new stream dependency stream



166
167
168
169
# File 'lib/http/2/stream.rb', line 166

def reprioritize(weight: 16, dependency: 0, exclusive: false)
  stream_error if @id.even?
  send(type: :priority, weight: weight, stream_dependency: dependency, exclusive: exclusive)
end

#send(frame) ⇒ Object

Processes outgoing HTTP 2.0 frames. Data frames may be automatically split and buffered based on maximum frame size and current stream flow control window size.

Parameters:

  • frame (Hash)


121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/http/2/stream.rb', line 121

def send(frame)
  transition(frame, true)
  frame[:stream] ||= @id

  process_priority(frame) if frame[:type] == :priority

  case frame[:type]
  when :data
    # @remote_window is maintained in send_data
    send_data(frame)
  when :window_update
    @local_window += frame[:increment]
    emit(:frame, frame)
  else
    emit(:frame, frame)
  end

  complete_transition(frame)
end