Class: Protocol::HTTP2::Connection

Inherits:
Object
  • Object
show all
Includes:
FlowControl
Defined in:
lib/protocol/http2/connection.rb

Direct Known Subclasses

Client, Server

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from FlowControl

#available_frame_size, #consume_local_window, #consume_remote_window, #send_window_update

Constructor Details

#initialize(framer, local_stream_id) ⇒ Connection

Returns a new instance of Connection.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/protocol/http2/connection.rb', line 31

def initialize(framer, local_stream_id)
  @state = :new
  @streams = {}
  
  @framer = framer
  @local_stream_id = local_stream_id
  @remote_stream_id = 0
  
  @local_settings = PendingSettings.new
  @remote_settings = Settings.new
  
  @decoder = HPACK::Context.new
  @encoder = HPACK::Context.new
  
  @local_window = Window.new(@local_settings.initial_window_size)
  @remote_window = Window.new(@remote_settings.initial_window_size)
end

Instance Attribute Details

#framerObject (readonly)

Returns the value of attribute framer.



61
62
63
# File 'lib/protocol/http2/connection.rb', line 61

def framer
  @framer
end

#local_settingsObject

Current settings value for local and peer



67
68
69
# File 'lib/protocol/http2/connection.rb', line 67

def local_settings
  @local_settings
end

#local_windowObject (readonly)

Our window for receiving data. When we receive data, it reduces this window. If the window gets too small, we must send a window update.



72
73
74
# File 'lib/protocol/http2/connection.rb', line 72

def local_window
  @local_window
end

#remote_settingsObject

Returns the value of attribute remote_settings.



68
69
70
# File 'lib/protocol/http2/connection.rb', line 68

def remote_settings
  @remote_settings
end

#remote_windowObject (readonly)

Our window for sending data. When we send data, it reduces this window.



75
76
77
# File 'lib/protocol/http2/connection.rb', line 75

def remote_window
  @remote_window
end

#stateObject

Connection state (:new, :open, :closed).



64
65
66
# File 'lib/protocol/http2/connection.rb', line 64

def state
  @state
end

#streamsObject (readonly)

Returns the value of attribute streams.



106
107
108
# File 'lib/protocol/http2/connection.rb', line 106

def streams
  @streams
end

Instance Method Details

#closeObject



81
82
83
84
85
# File 'lib/protocol/http2/connection.rb', line 81

def close
  send_goaway
  
  @framer.close
end

#closed?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/protocol/http2/connection.rb', line 77

def closed?
  @state == :closed
end

#create_stream(stream_id = next_stream_id) ⇒ Object



255
256
257
# File 'lib/protocol/http2/connection.rb', line 255

def create_stream(stream_id = next_stream_id)
  Stream.new(self, stream_id)
end

#decode_headers(data) ⇒ Object



93
94
95
# File 'lib/protocol/http2/connection.rb', line 93

def decode_headers(data)
  HPACK::Decompressor.new(data, @decoder).decode
end

#deleted_stream?(frame) ⇒ Boolean

Returns:

  • (Boolean)


283
284
285
# File 'lib/protocol/http2/connection.rb', line 283

def deleted_stream? frame
  frame.stream_id <= @local_stream_id or frame.stream_id <= @remote_stream_id
end

#encode_headers(headers, buffer = String.new.b) ⇒ Object



87
88
89
90
91
# File 'lib/protocol/http2/connection.rb', line 87

def encode_headers(headers, buffer = String.new.b)
  HPACK::Compressor.new(buffer, @encoder).encode(headers)
  
  return buffer
end

#idObject



49
50
51
# File 'lib/protocol/http2/connection.rb', line 49

def id
  0
end

#maximum_concurrent_streamsObject



57
58
59
# File 'lib/protocol/http2/connection.rb', line 57

def maximum_concurrent_streams
  [@local_settings.maximum_concurrent_streams, @remote_settings.maximum_concurrent_streams].min
end

#maximum_frame_sizeObject



53
54
55
# File 'lib/protocol/http2/connection.rb', line 53

def maximum_frame_size
  @remote_settings.maximum_frame_size
end

#next_stream_idObject

Streams are identified with an unsigned 31-bit integer. Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers. A stream identifier of zero (0x0) is used for connection control messages; the stream identifier of zero cannot be used to establish a new stream.



98
99
100
101
102
103
104
# File 'lib/protocol/http2/connection.rb', line 98

def next_stream_id
  id = @local_stream_id
  
  @local_stream_id += 2
  
  return id
end

#open!Object



212
213
214
215
216
# File 'lib/protocol/http2/connection.rb', line 212

def open!
  @state = :open
  
  return self
end

#process_settings(frame) ⇒ Boolean

In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the “open” or “half-closed (remote)” state). When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream flow-control windows that it maintains by the difference between the new value and the old value.

Returns:

  • (Boolean)

    whether the frame was an acknowledgement



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/protocol/http2/connection.rb', line 189

def process_settings(frame)
  if frame.acknowledgement?
    # The remote end has confirmed the settings have been received:
    changes = @local_settings.acknowledge
    
    update_local_settings(changes)
    
    return true
  else
    # The remote end is updating the settings, we reply with acknowledgement:
    reply = frame.acknowledge
    
    write_frame(reply)
    
    changes = frame.unpack
    @remote_settings.update(changes)
    
    update_remote_settings(changes)
    
    return false
  end
end

#read_frameObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/protocol/http2/connection.rb', line 108

def read_frame
  frame = @framer.read_frame(@local_settings.maximum_frame_size)
  # puts "#{self.class} #{@state} read_frame: class=#{frame.class} flags=#{frame.flags} length=#{frame.length}"
  # puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}"
  
  yield frame if block_given?
  
  frame.apply(self)
  
  return frame
rescue ProtocolError => error
  send_goaway(error.code || PROTOCOL_ERROR, error.message)
  
  raise
rescue HPACK::CompressionError => error
  send_goaway(COMPRESSION_ERROR, error.message)
  
  raise
rescue
  send_goaway(PROTOCOL_ERROR, $!.message)
  
  raise
end

#receive_data(frame) ⇒ Object



241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/protocol/http2/connection.rb', line 241

def receive_data(frame)
  consume_local_window(frame)
  
  if stream = @streams[frame.stream_id]
    stream.receive_data(frame)
    
    if stream.closed?
      @streams.delete(stream.id)
    end
  else
    raise ProtocolError, "Bad stream"
  end
end

#receive_frame(frame) ⇒ Object



331
332
333
# File 'lib/protocol/http2/connection.rb', line 331

def receive_frame(frame)
  warn "Unhandled frame #{frame.inspect}"
end

#receive_goaway(frame) ⇒ Object



150
151
152
# File 'lib/protocol/http2/connection.rb', line 150

def receive_goaway(frame)
  @state = :closed
end

#receive_headers(frame) ⇒ Object



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/protocol/http2/connection.rb', line 259

def receive_headers(frame)
  if frame.stream_id == 0
    raise ProtocolError, "Cannot receive headers for stream 0!"
  end
  
  if stream = @streams[frame.stream_id]
    stream.receive_headers(frame)
    
    if stream.closed?
      @streams.delete(stream.id)
    end
  elsif frame.stream_id > @remote_stream_id
    if @streams.count < self.maximum_concurrent_streams
      stream = create_stream(frame.stream_id)
      stream.receive_headers(frame)
      
      @remote_stream_id = stream.id
      @streams[stream.id] = stream
    else
      raise ProtocolError, "Exceeded maximum concurrent streams"
    end
  end
end

#receive_ping(frame) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
# File 'lib/protocol/http2/connection.rb', line 229

def receive_ping(frame)
  if @state != :closed
    unless frame.acknowledgement?
      reply = frame.acknowledge
      
      write_frame(reply)
    end
  else
    raise ProtocolError, "Cannot receive ping in state #{@state}"
  end
end

#receive_priority(frame) ⇒ Object



287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/protocol/http2/connection.rb', line 287

def receive_priority(frame)
  if stream = @streams[frame.stream_id]
    stream.receive_priority(frame)
  elsif deleted_stream? frame
    # ignore
  else
    stream = create_stream(frame.stream_id)
    stream.receive_priority(frame)
    
    @streams[frame.stream_id] = stream
  end
end

#receive_reset_stream(frame) ⇒ Object



300
301
302
303
304
305
306
307
308
309
310
# File 'lib/protocol/http2/connection.rb', line 300

def receive_reset_stream(frame)
  if stream = @streams[frame.stream_id]
    stream.receive_reset_stream(frame)
    
    @streams.delete(stream.id)
  elsif deleted_stream? frame
    # ignore
  else
    raise ProtocolError, "Bad stream"
  end
end

#receive_settings(frame) ⇒ Object



218
219
220
221
222
223
224
225
226
227
# File 'lib/protocol/http2/connection.rb', line 218

def receive_settings(frame)
  if @state == :new
    # We transition to :open when we receive acknowledgement of first settings frame:
    open! if process_settings(frame)
  elsif @state != :closed
    process_settings(frame)
  else
    raise ProtocolError, "Cannot receive settings in state #{@state}"
  end
end

#receive_window_update(frame) ⇒ Object



312
313
314
315
316
317
318
319
320
321
322
# File 'lib/protocol/http2/connection.rb', line 312

def receive_window_update(frame)
  if frame.connection?
    super
  elsif stream = @streams[frame.stream_id]
    stream.receive_window_update(frame)
  elsif deleted_stream? frame
    # ignore
  else
    raise ProtocolError, "Cannot update window of non-existant stream: #{frame.stream_id}"
  end
end

#send_goaway(error_code = 0, message = "") ⇒ Object



141
142
143
144
145
146
147
148
# File 'lib/protocol/http2/connection.rb', line 141

def send_goaway(error_code = 0, message = "")
  frame = GoawayFrame.new
  frame.pack @remote_stream_id, error_code, message
  
  write_frame(frame)
  
  @state = :closed
end

#send_ping(data) ⇒ Object



159
160
161
162
163
164
165
166
167
168
# File 'lib/protocol/http2/connection.rb', line 159

def send_ping(data)
  if @state != :closed
    frame = PingFrame.new
    frame.pack data
    
    write_frame(frame)
  else
    raise ProtocolError, "Cannot send ping in state #{@state}"
  end
end

#send_settings(changes) ⇒ Object



132
133
134
135
136
137
138
139
# File 'lib/protocol/http2/connection.rb', line 132

def send_settings(changes)
  @local_settings.append(changes)
  
  frame = SettingsFrame.new
  frame.pack(changes)
  
  write_frame(frame)
end

#update_local_settings(changes) ⇒ Object



170
171
172
173
174
175
176
# File 'lib/protocol/http2/connection.rb', line 170

def update_local_settings(changes)
  capacity = @local_settings.initial_window_size
  
  @streams.each_value do |stream|
    stream.local_window.capacity = capacity
  end
end

#update_remote_settings(changes) ⇒ Object



178
179
180
181
182
183
184
# File 'lib/protocol/http2/connection.rb', line 178

def update_remote_settings(changes)
  capacity = @remote_settings.initial_window_size
  
  @streams.each_value do |stream|
    stream.remote_window.capacity = capacity
  end
end

#window_updatedObject



324
325
326
327
328
329
# File 'lib/protocol/http2/connection.rb', line 324

def window_updated
  # This is very inefficient, but workable.
  @streams.each_value do |stream|
    stream.window_updated unless stream.closed?
  end
end

#write_frame(frame) ⇒ Object



154
155
156
157
# File 'lib/protocol/http2/connection.rb', line 154

def write_frame(frame)
  # puts "#{self.class} #{@state} write_frame: class=#{frame.class} flags=#{frame.flags} length=#{frame.length}"
  @framer.write_frame(frame)
end