Class: HTTP2::Stream

Inherits:
Object
  • Object
show all
Includes:
Emitter, Error, FlowBuffer
Defined in:
lib/http/2/stream.rb

Overview

A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.

This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.

                      +--------+
                 PP   |        |   PP
             ,--------|  idle  |--------.
            /         |        |         \
           v          +--------+          v
    +----------+          |           +----------+
    |          |          | H         |          |
,---|:reserved |          |           |:reserved |---.
|   | (local)  |          v           | (remote) |   |
|   +----------+      +--------+      +----------+   |
|      | :active      |        |      :active |      |
|      |      ,-------|:active |-------.      |      |
|      | H   /   ES   |        |   ES   \   H |      |
|      v    v         +--------+         v    v      |
|   +-----------+          |          +-----------+  |
|   |:half_close|          |          |:half_close|  |
|   |  (remote) |          |          |  (local)  |  |
|   +-----------+          |          +-----------+  |
|        |                 v                |        |
|        |    ES/R    +--------+    ES/R    |        |
|        `----------->|        |<-----------'        |
| R                   | :close |                   R |
`-------------------->|        |<--------------------'
                      +--------+

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Emitter

#add_listener, #emit, #once

Methods included from FlowBuffer

#buffered_amount

Constructor Details

#initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil) ⇒ Stream

Initializes new stream.

Note that you should never have to call this directly. To create a new client initiated stream, use Connection#new_stream. Similarly, Connection will emit new stream objects, when new stream frames are received.



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/http/2/stream.rb', line 74

def initialize(connection:, id:, weight: 16, dependency: 0, exclusive: false, parent: nil)
  @connection = connection
  @id = id
  @weight = weight
  @dependency = dependency
  process_priority(weight: weight, stream_dependency: dependency, exclusive: exclusive)
  @local_window  = connection.local_settings[:settings_initial_window_size]
  @remote_window = connection.remote_settings[:settings_initial_window_size]
  @parent = parent
  @state  = :idle
  @error  = false
  @closed = false
  @send_buffer = []

  on(:window) { |v| @remote_window = v }
  on(:local_window) { |v| @local_window = v }
end

Instance Attribute Details

#closedObject (readonly)

Reason why connection was closed.



60
61
62
# File 'lib/http/2/stream.rb', line 60

def closed
  @closed
end

#dependencyObject (readonly)

Returns the value of attribute dependency



52
53
54
# File 'lib/http/2/stream.rb', line 52

def dependency
  @dependency
end

#idObject (readonly)

Stream ID (odd for client initiated streams, even otherwise).



42
43
44
# File 'lib/http/2/stream.rb', line 42

def id
  @id
end

#local_windowObject (readonly) Also known as: window

Size of current stream flow control window.



55
56
57
# File 'lib/http/2/stream.rb', line 55

def local_window
  @local_window
end

#parentObject (readonly)

Request parent stream of push stream.



48
49
50
# File 'lib/http/2/stream.rb', line 48

def parent
  @parent
end

#remote_windowObject (readonly)

Returns the value of attribute remote_window



56
57
58
# File 'lib/http/2/stream.rb', line 56

def remote_window
  @remote_window
end

#stateObject (readonly)

Stream state as defined by HTTP 2.0.



45
46
47
# File 'lib/http/2/stream.rb', line 45

def state
  @state
end

#weightObject (readonly)

Stream priority as set by initiator.



51
52
53
# File 'lib/http/2/stream.rb', line 51

def weight
  @weight
end

Instance Method Details

#cancelObject

Sends a RST_STREAM indicating that the stream is no longer needed.



199
200
201
# File 'lib/http/2/stream.rb', line 199

def cancel
  send(type: :rst_stream, error: :cancel)
end

#close(error = :stream_closed) ⇒ Object

Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.



194
195
196
# File 'lib/http/2/stream.rb', line 194

def close(error = :stream_closed)
  send(type: :rst_stream, error: error)
end

#data(payload, end_stream: true) ⇒ Object

Sends DATA frame containing response payload.



175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/http/2/stream.rb', line 175

def data(payload, end_stream: true)
  flags = []
  flags << :end_stream if end_stream

  # Split data according to each frame is smaller enough
  # TODO: consider padding?
  max_size = @connection.remote_settings[:settings_max_frame_size]
  while payload.bytesize > max_size
    chunk = payload.slice!(0, max_size)
    send(type: :data, payload: chunk)
  end

  send(type: :data, flags: flags, payload: payload)
end

#headers(headers, end_headers: true, end_stream: false) ⇒ Object

Sends a HEADERS frame containing HTTP response headers.



146
147
148
149
150
151
152
# File 'lib/http/2/stream.rb', line 146

def headers(headers, end_headers: true, end_stream: false)
  flags = []
  flags << :end_headers if end_headers
  flags << :end_stream  if end_stream

  send(type: :headers, flags: flags, payload: headers.to_a)
end

#promise(headers, end_headers: true, &block) ⇒ Object



154
155
156
157
158
159
# File 'lib/http/2/stream.rb', line 154

def promise(headers, end_headers: true, &block)
  fail ArgumentError, 'must provide callback' unless block_given?

  flags = end_headers ? [:end_headers] : []
  emit(:promise, self, headers, flags, &block)
end

#receive(frame) ⇒ Object Also known as: <<

Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/http/2/stream.rb', line 95

def receive(frame)
  transition(frame, false)

  case frame[:type]
  when :data
    @local_window -= frame[:payload].size
    emit(:data, frame[:payload]) unless frame[:ignore]
  when :headers, :push_promise
    emit(:headers, frame[:payload]) unless frame[:ignore]
  when :priority
    process_priority(frame)
  when :window_update
    process_window_update(frame)
  when :altsvc, :blocked
    emit(frame[:type], frame)
  end

  complete_transition(frame)
end

#refuseObject

Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.



205
206
207
# File 'lib/http/2/stream.rb', line 205

def refuse
  send(type: :rst_stream, error: :refused_stream)
end

#reprioritize(weight: 16, dependency: 0, exclusive: false) ⇒ Object

Sends a PRIORITY frame with new stream priority value (can only be performed by the client).



166
167
168
169
# File 'lib/http/2/stream.rb', line 166

def reprioritize(weight: 16, dependency: 0, exclusive: false)
  stream_error if @id.even?
  send(type: :priority, weight: weight, stream_dependency: dependency, exclusive: exclusive)
end

#send(frame) ⇒ Object

Processes outgoing HTTP 2.0 frames. Data frames may be automatically split and buffered based on maximum frame size and current stream flow control window size.



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/http/2/stream.rb', line 121

def send(frame)
  transition(frame, true)
  frame[:stream] ||= @id

  process_priority(frame) if frame[:type] == :priority

  case frame[:type]
  when :data
    # @remote_window is maintained in send_data
    send_data(frame)
  when :window_update
    @local_window += frame[:increment]
    emit(:frame, frame)
  else
    emit(:frame, frame)
  end

  complete_transition(frame)
end