Class: HTTP2::Stream

Inherits:
Object
  • Object
show all
Includes:
Emitter, Error, FlowBuffer
Defined in:
lib/http/2/stream.rb

Overview

A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.

This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.

                      +--------+
                 PP   |        |   PP
             ,--------|  idle  |--------.
            /         |        |         \
           v          +--------+          v
    +----------+          |           +----------+
    |          |          | H         |          |
,---|:reserved |          |           |:reserved |---.
|   | (local)  |          v           | (remote) |   |
|   +----------+      +--------+      +----------+   |
|      | :active      |        |      :active |      |
|      |      ,-------|:active |-------.      |      |
|      | H   /   ES   |        |   ES   \   H |      |
|      v    v         +--------+         v    v      |
|   +-----------+          |          +-----------+  |
|   |:half_close|          |          |:half_close|  |
|   |  (remote) |          |          |  (local)  |  |
|   +-----------+          |          +-----------+  |
|        |                 v                |        |
|        |    ES/R    +--------+    ES/R    |        |
|        `----------->|        |<-----------'        |
| R                   | :close |                   R |
`-------------------->|        |<--------------------'
                      +--------+

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Emitter

#add_listener, #emit, #once

Methods included from FlowBuffer

#buffered_amount

Constructor Details

#initialize(connection: nil, id: nil, weight: 16, dependency: 0, exclusive: false, parent: nil) ⇒ Stream

Initializes new stream.

Note that you should never have to call this directly. To create a new client initiated stream, use Connection#new_stream. Similarly, Connection will emit new stream objects, when new stream frames are received.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/http/2/stream.rb', line 75

def initialize(connection: nil, id: nil, weight: 16, dependency: 0, exclusive: false, parent: nil)
  @connection = connection or raise ArgumentError.new("missing mandatory argument connection")
  @id = id                 or raise ArgumentError.new("missing mandatory argument id")
  @weight = weight
  @dependency = dependency
  process_priority({weight: weight, stream_dependency: dependency, exclusive: exclusive})
  @local_window  = connection.local_settings[:settings_initial_window_size]
  @remote_window = connection.remote_settings[:settings_initial_window_size]
  @parent = parent
  @state  = :idle
  @error  = false
  @closed = false
  @send_buffer = []

  on(:window) { |v| @remote_window = v }
  on(:local_window) { |v| @local_window = v }
end

Instance Attribute Details

#closedObject (readonly)

Reason why connection was closed.



61
62
63
# File 'lib/http/2/stream.rb', line 61

def closed
  @closed
end

#dependencyObject (readonly)

Returns the value of attribute dependency



53
54
55
# File 'lib/http/2/stream.rb', line 53

def dependency
  @dependency
end

#idObject (readonly)

Stream ID (odd for client initiated streams, even otherwise).



43
44
45
# File 'lib/http/2/stream.rb', line 43

def id
  @id
end

#local_windowObject (readonly) Also known as: window

Size of current stream flow control window.



56
57
58
# File 'lib/http/2/stream.rb', line 56

def local_window
  @local_window
end

#parentObject (readonly)

Request parent stream of push stream.



49
50
51
# File 'lib/http/2/stream.rb', line 49

def parent
  @parent
end

#remote_windowObject (readonly)

Returns the value of attribute remote_window



57
58
59
# File 'lib/http/2/stream.rb', line 57

def remote_window
  @remote_window
end

#stateObject (readonly)

Stream state as defined by HTTP 2.0.



46
47
48
# File 'lib/http/2/stream.rb', line 46

def state
  @state
end

#weightObject (readonly)

Stream priority as set by initiator.



52
53
54
# File 'lib/http/2/stream.rb', line 52

def weight
  @weight
end

Instance Method Details

#cancelObject

Sends a RST_STREAM indicating that the stream is no longer needed.



200
201
202
# File 'lib/http/2/stream.rb', line 200

def cancel
  send({type: :rst_stream, error: :cancel})
end

#close(error = :stream_closed) ⇒ Object

Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.



195
196
197
# File 'lib/http/2/stream.rb', line 195

def close(error = :stream_closed)
  send({type: :rst_stream, error: error})
end

#data(payload, end_stream: true) ⇒ Object

Sends DATA frame containing response payload.



176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/http/2/stream.rb', line 176

def data(payload, end_stream: true)
  flags = []
  flags << :end_stream if end_stream

  # Split data according to each frame is smaller enough
  # TODO: consider padding?
  max_size = @connection.remote_settings[:settings_max_frame_size]
  while payload.bytesize > max_size do
    chunk = payload.slice!(0, max_size)
    send({type: :data, payload: chunk})
  end

  send({type: :data, flags: flags, payload: payload})
end

#headers(headers, end_headers: true, end_stream: false) ⇒ Object

Sends a HEADERS frame containing HTTP response headers.



147
148
149
150
151
152
153
# File 'lib/http/2/stream.rb', line 147

def headers(headers, end_headers: true, end_stream: false)
  flags = []
  flags << :end_headers if end_headers
  flags << :end_stream  if end_stream

  send({type: :headers, flags: flags, payload: headers.to_a})
end

#promise(headers, end_headers: true, &block) ⇒ Object

Raises:

  • (Exception)


155
156
157
158
159
160
# File 'lib/http/2/stream.rb', line 155

def promise(headers, end_headers: true, &block)
  raise Exception.new("must provide callback") if !block_given?

  flags = end_headers ? [:end_headers] : []
  emit(:promise, self, headers, flags, &block)
end

#receive(frame) ⇒ Object Also known as: <<

Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/http/2/stream.rb', line 96

def receive(frame)
  transition(frame, false)

  case frame[:type]
  when :data
    @local_window -= frame[:payload].size
    emit(:data, frame[:payload]) if !frame[:ignore]
  when :headers, :push_promise
    emit(:headers, frame[:payload]) if !frame[:ignore]
  when :priority
    process_priority(frame)
  when :window_update
    process_window_update(frame)
  when :altsvc, :blocked
    emit(frame[:type], frame)
  end

  complete_transition(frame)
end

#refuseObject

Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.



206
207
208
# File 'lib/http/2/stream.rb', line 206

def refuse
  send({type: :rst_stream, error: :refused_stream})
end

#reprioritize(weight: 16, dependency: 0, exclusive: false) ⇒ Object

Sends a PRIORITY frame with new stream priority value (can only be performed by the client).



167
168
169
170
# File 'lib/http/2/stream.rb', line 167

def reprioritize(weight: 16, dependency: 0, exclusive: false)
  stream_error if @id.even?
  send({type: :priority, weight: weight, stream_dependency: dependency, exclusive: exclusive})
end

#send(frame) ⇒ Object

Processes outgoing HTTP 2.0 frames. Data frames may be automatically split and buffered based on maximum frame size and current stream flow control window size.



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/http/2/stream.rb', line 122

def send(frame)
  transition(frame, true)
  frame[:stream] ||= @id

  process_priority(frame) if frame[:type] == :priority

  case frame[:type]
  when :data
    # @remote_window is maintained in send_data
    send_data(frame)
  when :window_update
    @local_window += frame[:increment]
    emit(:frame, frame)
  else
    emit(:frame, frame)
  end

  complete_transition(frame)
end