Class: HTTP2::Stream

Inherits:
Object
  • Object
show all
Includes:
Emitter, Error, FlowBuffer
Defined in:
lib/http/2/stream.rb

Overview

A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.

This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.

                      +--------+
                 PP   |        |   PP
             ,--------|  idle  |--------.
            /         |        |         \
           v          +--------+          v
    +----------+          |           +----------+
    |          |          | H         |          |
,---|:reserved |          |           |:reserved |---.
|   | (local)  |          v           | (remote) |   |
|   +----------+      +--------+      +----------+   |
|      | :active      |        |      :active |      |
|      |      ,-------|:active |-------.      |      |
|      | H   /   ES   |        |   ES   \   H |      |
|      v    v         +--------+         v    v      |
|   +-----------+          |          +-----------+  |
|   |:half_close|          |          |:half_close|  |
|   |  (remote) |          |          |  (local)  |  |
|   +-----------+          |          +-----------+  |
|        |                 v                |        |
|        |    ES/R    +--------+    ES/R    |        |
|        `----------->|        |<-----------'        |
| R                   | :close |                   R |
`-------------------->|        |<--------------------'
                      +--------+

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Emitter

#add_listener, #emit, #once

Methods included from FlowBuffer

#buffered_amount

Constructor Details

#initialize(connection: nil, id: nil, weight: 16, dependency: 0, exclusive: false, parent: nil) ⇒ Stream

Initializes new stream.

Note that you should never have to call this directly. To create a new client initiated stream, use Connection#new_stream. Similarly, Connection will emit new stream objects, when new stream frames are received.

Parameters:

  • id (Integer)
  • weight (Integer)
  • dependency (Integer)
  • exclusive (Boolean)
  • window (Integer)
  • parent (Stream)


75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/http/2/stream.rb', line 75

def initialize(connection: nil, id: nil, weight: 16, dependency: 0, exclusive: false, parent: nil)
  @connection = connection or raise ArgumentError.new("missing mandatory argument connection")
  @id = id                 or raise ArgumentError.new("missing mandatory argument id")
  @weight = weight
  @dependency = dependency
  process_priority({weight: weight, stream_dependency: dependency, exclusive: exclusive})
  @local_window  = connection.local_settings[:settings_initial_window_size]
  @remote_window = connection.remote_settings[:settings_initial_window_size]
  @parent = parent
  @state  = :idle
  @error  = false
  @closed = false
  @send_buffer = []

  on(:window) { |v| @remote_window = v }
  on(:local_window) { |v| @local_window = v }
end

Instance Attribute Details

#closedObject (readonly)

Reason why connection was closed.



61
62
63
# File 'lib/http/2/stream.rb', line 61

def closed
  @closed
end

#dependencyObject (readonly)

Returns the value of attribute dependency



53
54
55
# File 'lib/http/2/stream.rb', line 53

def dependency
  @dependency
end

#idObject (readonly)

Stream ID (odd for client initiated streams, even otherwise).



43
44
45
# File 'lib/http/2/stream.rb', line 43

def id
  @id
end

#local_windowObject (readonly) Also known as: window

Size of current stream flow control window.



56
57
58
# File 'lib/http/2/stream.rb', line 56

def local_window
  @local_window
end

#parentObject (readonly)

Request parent stream of push stream.



49
50
51
# File 'lib/http/2/stream.rb', line 49

def parent
  @parent
end

#remote_windowObject (readonly)

Returns the value of attribute remote_window



57
58
59
# File 'lib/http/2/stream.rb', line 57

def remote_window
  @remote_window
end

#stateObject (readonly)

Stream state as defined by HTTP 2.0.



46
47
48
# File 'lib/http/2/stream.rb', line 46

def state
  @state
end

#weightObject (readonly)

Stream priority as set by initiator.



52
53
54
# File 'lib/http/2/stream.rb', line 52

def weight
  @weight
end

Instance Method Details

#cancelObject

Sends a RST_STREAM indicating that the stream is no longer needed.



200
201
202
# File 'lib/http/2/stream.rb', line 200

def cancel
  send({type: :rst_stream, error: :cancel})
end

#close(error = :stream_closed) ⇒ Object

Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.

Parameters:

  • error (:Symbol) (defaults to: :stream_closed)

    optional reason why stream was closed



195
196
197
# File 'lib/http/2/stream.rb', line 195

def close(error = :stream_closed)
  send({type: :rst_stream, error: error})
end

#data(payload, end_stream: true) ⇒ Object

Sends DATA frame containing response payload.

Parameters:

  • payload (String)
  • end_stream (Boolean)

    indicates last response DATA frame



176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/http/2/stream.rb', line 176

def data(payload, end_stream: true)
  flags = []
  flags << :end_stream if end_stream

  # Split data according to each frame is smaller enough
  # TODO: consider padding?
  max_size = @connection.remote_settings[:settings_max_frame_size]
  while payload.bytesize > max_size do
    chunk = payload.slice!(0, max_size)
    send({type: :data, payload: chunk})
  end

  send({type: :data, flags: flags, payload: payload})
end

#headers(headers, end_headers: true, end_stream: false) ⇒ Object

Sends a HEADERS frame containing HTTP response headers.

Parameters:

  • headers (Array or Hash)

    Array of key-value pairs or Hash

  • end_headers (Boolean)

    indicates that no more headers will be sent

  • end_stream (Boolean)

    indicates that no payload will be sent



147
148
149
150
151
152
153
# File 'lib/http/2/stream.rb', line 147

def headers(headers, end_headers: true, end_stream: false)
  flags = []
  flags << :end_headers if end_headers
  flags << :end_stream  if end_stream

  send({type: :headers, flags: flags, payload: headers.to_a})
end

#promise(headers, end_headers: true, &block) ⇒ Object

Raises:

  • (Exception)


155
156
157
158
159
160
# File 'lib/http/2/stream.rb', line 155

def promise(headers, end_headers: true, &block)
  raise Exception.new("must provide callback") if !block_given?

  flags = end_headers ? [:end_headers] : []
  emit(:promise, self, headers, flags, &block)
end

#receive(frame) ⇒ Object Also known as: <<

Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.

Parameters:

  • frame (Hash)


96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/http/2/stream.rb', line 96

def receive(frame)
  transition(frame, false)

  case frame[:type]
  when :data
    @local_window -= frame[:payload].size
    emit(:data, frame[:payload]) if !frame[:ignore]
  when :headers, :push_promise
    emit(:headers, frame[:payload]) if !frame[:ignore]
  when :priority
    process_priority(frame)
  when :window_update
    process_window_update(frame)
  when :altsvc, :blocked
    emit(frame[:type], frame)
  end

  complete_transition(frame)
end

#refuseObject

Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.



206
207
208
# File 'lib/http/2/stream.rb', line 206

def refuse
  send({type: :rst_stream, error: :refused_stream})
end

#reprioritize(weight: 16, dependency: 0, exclusive: false) ⇒ Object

Sends a PRIORITY frame with new stream priority value (can only be performed by the client).

Parameters:

  • weight (Integer)

    new stream weight value

  • dependency (Integer)

    new stream dependency stream



167
168
169
170
# File 'lib/http/2/stream.rb', line 167

def reprioritize(weight: 16, dependency: 0, exclusive: false)
  stream_error if @id.even?
  send({type: :priority, weight: weight, stream_dependency: dependency, exclusive: exclusive})
end

#send(frame) ⇒ Object

Processes outgoing HTTP 2.0 frames. Data frames may be automatically split and buffered based on maximum frame size and current stream flow control window size.

Parameters:

  • frame (Hash)


122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/http/2/stream.rb', line 122

def send(frame)
  transition(frame, true)
  frame[:stream] ||= @id

  process_priority(frame) if frame[:type] == :priority

  case frame[:type]
  when :data
    # @remote_window is maintained in send_data
    send_data(frame)
  when :window_update
    @local_window += frame[:increment]
    emit(:frame, frame)
  else
    emit(:frame, frame)
  end

  complete_transition(frame)
end