Class: HTTP2::Stream

Inherits:
Object
  • Object
show all
Includes:
Emitter, Error, FlowBuffer
Defined in:
lib/http/2/stream.rb

Overview

A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.

This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.

                      +--------+
                 PP   |        |   PP
             ,--------|  idle  |--------.
            /         |        |         \
           v          +--------+          v
    +----------+          |           +----------+
    |          |          | H         |          |
,---|:reserved |          |           |:reserved |---.
|   | (local)  |          v           | (remote) |   |
|   +----------+      +--------+      +----------+   |
|      | :active      |        |      :active |      |
|      |      ,-------|:active |-------.      |      |
|      | H   /   ES   |        |   ES   \   H |      |
|      v    v         +--------+         v    v      |
|   +-----------+          |          +-_---------+  |
|   |:half_close|          |          |:half_close|  |
|   |  (remote) |          |          |  (local)  |  |
|   +-----------+          |          +-----------+  |
|        |                 v                |        |
|        |    ES/R    +--------+    ES/R    |        |
|        `----------->|        |<-----------'        |
| R                   | :close |                   R |
`-------------------->|        |<--------------------'
                      +--------+

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from Emitter

#add_listener, #emit, #once

Methods included from FlowBuffer

#buffered_amount

Constructor Details

- (Stream) initialize(id, priority, window, parent = nil)

Initializes new stream.

Note that you should never have to call this directly. To create a new client initiated stream, use Connection#new_stream. Similarly, Connection will emit new stream objects, when new stream frames are received.

Parameters:

  • id (Integer)
  • priority (Integer)
  • window (Integer)
  • parent (Stream) (defaults to: nil)


70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/http/2/stream.rb', line 70

def initialize(id, priority, window, parent = nil)
  @id = id
  @priority = priority
  @window = window
  @parent = parent
  @state  = :idle
  @error  = false
  @closed = false
  @send_buffer = []

  on(:window) { |v| @window = v }
end

Instance Attribute Details

- (Object) closed (readonly)

Reason why connection was closed.



58
59
60
# File 'lib/http/2/stream.rb', line 58

def closed
  @closed
end

- (Object) id (readonly)

Stream ID (odd for client initiated streams, even otherwise).



43
44
45
# File 'lib/http/2/stream.rb', line 43

def id
  @id
end

- (Object) parent (readonly)

Request parent stream of push stream.



49
50
51
# File 'lib/http/2/stream.rb', line 49

def parent
  @parent
end

- (Object) priority (readonly)

Stream priority as set by initiator.



52
53
54
# File 'lib/http/2/stream.rb', line 52

def priority
  @priority
end

- (Object) state (readonly)

Stream state as defined by HTTP 2.0.



46
47
48
# File 'lib/http/2/stream.rb', line 46

def state
  @state
end

- (Object) window (readonly)

Size of current stream flow control window.



55
56
57
# File 'lib/http/2/stream.rb', line 55

def window
  @window
end

Instance Method Details

- (Object) cancel

Sends a RST_STREAM indicating that the stream is no longer needed.



184
185
186
# File 'lib/http/2/stream.rb', line 184

def cancel
  send({type: :rst_stream, error: :cancel})
end

- (Object) close(error = :stream_closed)

Sends a RST_STREAM frame which closes current stream - this does not close the underlying connection.

Parameters:

  • error (:Symbol) (defaults to: :stream_closed)

    optional reason why stream was closed



179
180
181
# File 'lib/http/2/stream.rb', line 179

def close(error = :stream_closed)
  send({type: :rst_stream, error: error})
end

- (Object) data(payload, end_stream: true)

Sends DATA frame containing response payload.

Parameters:

  • payload (String)
  • end_stream (Boolean)

    indicates last response DATA frame



163
164
165
166
167
168
169
170
171
172
173
# File 'lib/http/2/stream.rb', line 163

def data(payload, end_stream: true)
  flags = []
  flags << :end_stream if end_stream

  while payload.bytesize > MAX_FRAME_SIZE do
    chunk = payload.slice!(0, MAX_FRAME_SIZE)
    send({type: :data, payload: chunk})
  end

  send({type: :data, flags: flags, payload: payload})
end

- (Object) headers(headers, end_headers: true, end_stream: false)

Sends a HEADERS frame containing HTTP response headers.

Parameters:

  • headers (Hash)
  • end_headers (Boolean)

    indicates that no more headers will be sent

  • end_stream (Boolean)

    indicates that no payload will be sent



135
136
137
138
139
140
141
# File 'lib/http/2/stream.rb', line 135

def headers(headers, end_headers: true, end_stream: false)
  flags = []
  flags << :end_headers if end_headers
  flags << :end_stream  if end_stream

  send({type: :headers, flags: flags, payload: headers.to_a})
end

- (Object) promise(headers, end_push_promise: true, &block)

Raises:

  • (Exception)


143
144
145
146
147
148
# File 'lib/http/2/stream.rb', line 143

def promise(headers, end_push_promise: true, &block)
  raise Exception.new("must provide callback") if !block_given?

  flags = end_push_promise ? [:end_push_promise] : []
  emit(:promise, self, headers, flags, &block)
end

- (Object) receive(frame) Also known as: <<

Processes incoming HTTP 2.0 frames. The frames must be decoded upstream.

Parameters:

  • frame (Hash)


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/http/2/stream.rb', line 86

def receive(frame)
  transition(frame, false)

  case frame[:type]
  when :data
    emit(:data, frame[:payload]) if !frame[:ignore]
  when :headers, :push_promise
    if frame[:payload].is_a? Array
      emit(:headers, Hash[*frame[:payload].flatten]) if !frame[:ignore]
    else
      emit(:headers, frame[:payload]) if !frame[:ignore]
    end
  when :priority
    @priority = frame[:priority]
    emit(:priority, @priority)
  when :window_update
    @window += frame[:increment]
    send_data
  end

  complete_transition(frame)
end

- (Object) refuse

Sends a RST_STREAM indicating that the stream has been refused prior to performing any application processing.



190
191
192
# File 'lib/http/2/stream.rb', line 190

def refuse
  send({type: :rst_stream, error: :refused_stream})
end

- (Object) reprioritize(p)

Sends a PRIORITY frame with new stream priority value (can only be performed by the client).

Parameters:

  • p (Integer)

    new stream priority value



154
155
156
157
# File 'lib/http/2/stream.rb', line 154

def reprioritize(p)
  stream_error if @id.even?
  send({type: :priority, priority: p})
end

- (Object) send(frame)

Processes outgoing HTTP 2.0 frames. Data frames may be automatically split and buffered based on maximum frame size and current stream flow control window size.

Parameters:

  • frame (Hash)


115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/http/2/stream.rb', line 115

def send(frame)
  transition(frame, true)
  frame[:stream] ||= @id

  @priority = frame[:priority] if frame[:type] == :priority

  if frame[:type] == :data
    send_data(frame)
  else
    emit(:frame, frame)
  end

  complete_transition(frame)
end