Class: HTTP2::Stream

Inherits:
Object
  • Object
show all
Includes:
Emitter, Error, FlowBuffer
Defined in:
lib/http/2/stream.rb

Overview

A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.

This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.

                      +--------+
                 PP   |        |   PP
             ,--------|  idle  |--------.
            /         |        |         \
           v          +--------+          v
    +----------+          |           +----------+
    |          |          | H         |          |
,---|:reserved |          |           |:reserved |---.
|   | (local)  |          v           | (remote) |   |
|   +----------+      +--------+      +----------+   |
|      | :active      |        |      :active |      |
|      |      ,-------|:active |-------.      |      |
|      | H   /   ES   |        |   ES   \   H |      |
|      v    v         +--------+         v    v      |
|   +-----------+          |          +-_---------+  |
|   |:half_close|          |          |:half_close|  |
|   |  (remote) |          |          |  (local)  |  |
|   +-----------+          |          +-----------+  |
|        |                 v                |        |
|        |    ES/R    +--------+    ES/R    |        |
|        `----------->|        |<-----------'        |
| R                   | :close |                   R |
`-------------------->|        |<--------------------'
                      +--------+

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from Emitter

#add_listener, #emit, #once

Methods included from FlowBuffer

#buffered_amount

Constructor Details

- (Stream) initialize(id, priority, window, parent = nil)

Initializes new stream.

Note that you should never have to call this directly. To create a new client initiated stream, use Connection#new_stream. Similarly, Connection will emit new stream objects, when new stream frames are received.



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/http/2/stream.rb', line 70

def initialize(id, priority, window, parent = nil)
  @id = id
  @priority = priority
  @window = window
  @parent = parent
  @state  = :idle
  @error  = false
  @closed = false
  @send_buffer = []

  on(:window) { |v| @window = v }
end

Instance Attribute Details

- (Object) closed (readonly)

Reason why connection was closed.



58
59
60
# File 'lib/http/2/stream.rb', line 58

def closed
  @closed
end

- (Object) id (readonly)

Stream ID (odd for client initiated streams, even otherwise).



43
44
45
# File 'lib/http/2/stream.rb', line 43

def id
  @id
end

- (Object) parent (readonly)

Request parent stream of push stream.



49
50
51
# File 'lib/http/2/stream.rb', line 49

def parent
  @parent
end

- (Object) priority (readonly)

Stream priority as set by initiator.



52
53
54
# File 'lib/http/2/stream.rb', line 52

def priority
  @priority
end

- (Object) state (readonly)

Stream state as defined by HTTP 2.0.



46
47
48
# File 'lib/http/2/stream.rb', line 46

def state
  @state
end

- (Object) window (readonly)

Size of current stream flow control window.



55
56
57
# File 'lib/http/2/stream.rb', line 55

def window
  @window
end

Instance Method Details

- (Object) cancel

Sends a RST_STREAM indicating that the stream is no longer needed.



184
185
186
# File 'lib/http/2/stream.rb', line 184

def cancel
  send({type: :rst_stream, error: :cancel})
end

- (Object) close(error = :stream_closed)

Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.



179
180
181
# File 'lib/http/2/stream.rb', line 179

def close(error = :stream_closed)
  send({type: :rst_stream, error: error})
end

- (Object) data(payload, end_stream: true)

Sends DATA frame containing response payload.



163
164
165
166
167
168
169
170
171
172
173
# File 'lib/http/2/stream.rb', line 163

def data(payload, end_stream: true)
  flags = []
  flags << :end_stream if end_stream

  while payload.bytesize > MAX_FRAME_SIZE do
    chunk = payload.slice!(0, MAX_FRAME_SIZE)
    send({type: :data, payload: chunk})
  end

  send({type: :data, flags: flags, payload: payload})
end

- (Object) headers(headers, end_headers: true, end_stream: false)

Sends a HEADERS frame containing HTTP response headers.



135
136
137
138
139
140
141
# File 'lib/http/2/stream.rb', line 135

def headers(headers, end_headers: true, end_stream: false)
  flags = []
  flags << :end_headers if end_headers
  flags << :end_stream  if end_stream

  send({type: :headers, flags: flags, payload: headers.to_a})
end

- (Object) promise(headers, end_headers: true, &block)

Raises:

  • (Exception)


143
144
145
146
147
148
# File 'lib/http/2/stream.rb', line 143

def promise(headers, end_headers: true, &block)
  raise Exception.new("must provide callback") if !block_given?

  flags = end_headers ? [:end_headers] : []
  emit(:promise, self, headers, flags, &block)
end

- (Object) receive(frame) Also known as: <<

Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/http/2/stream.rb', line 86

def receive(frame)
  transition(frame, false)

  case frame[:type]
  when :data
    emit(:data, frame[:payload]) if !frame[:ignore]
  when :headers, :push_promise
    if frame[:payload].is_a? Array
      emit(:headers, Hash[*frame[:payload].flatten]) if !frame[:ignore]
    else
      emit(:headers, frame[:payload]) if !frame[:ignore]
    end
  when :priority
    @priority = frame[:priority]
    emit(:priority, @priority)
  when :window_update
    @window += frame[:increment]
    send_data
  end

  complete_transition(frame)
end

- (Object) refuse

Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.



190
191
192
# File 'lib/http/2/stream.rb', line 190

def refuse
  send({type: :rst_stream, error: :refused_stream})
end

- (Object) reprioritize(p)

Sends a PRIORITY frame with new stream priority value (can only be performed by the client).



154
155
156
157
# File 'lib/http/2/stream.rb', line 154

def reprioritize(p)
  stream_error if @id.even?
  send({type: :priority, priority: p})
end

- (Object) send(frame)

Processes outgoing HTTP 2.0 frames. Data frames may be automatically split and buffered based on maximum frame size and current stream flow control window size.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/http/2/stream.rb', line 115

def send(frame)
  transition(frame, true)
  frame[:stream] ||= @id

  @priority = frame[:priority] if frame[:type] == :priority

  if frame[:type] == :data
    send_data(frame)
  else
    emit(:frame, frame)
  end

  complete_transition(frame)
end