Class: Protocol::HTTP2::Connection

Inherits:
Object
  • Object
show all
Includes:
FlowControl
Defined in:
lib/protocol/http2/connection.rb

Direct Known Subclasses

Client, Server

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from FlowControl

#available_frame_size, #consume_local_window, #consume_remote_window, #send_window_update

Constructor Details

#initialize(framer, local_stream_id) ⇒ Connection

Returns a new instance of Connection.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/protocol/http2/connection.rb', line 31

def initialize(framer, local_stream_id)
	@state = :new
	@streams = {}
	
	@framer = framer
	@local_stream_id = local_stream_id
	@remote_stream_id = 0
	
	@local_settings = PendingSettings.new
	@remote_settings = Settings.new
	
	@decoder = HPACK::Context.new
	@encoder = HPACK::Context.new
	
	@local_window = Window.new(@local_settings.initial_window_size)
	@remote_window = Window.new(@remote_settings.initial_window_size)
end

Instance Attribute Details

#framerObject (readonly)

Returns the value of attribute framer.



62
63
64
# File 'lib/protocol/http2/connection.rb', line 62

def framer
  @framer
end

#local_settingsObject

Current settings value for local and peer



68
69
70
# File 'lib/protocol/http2/connection.rb', line 68

def local_settings
  @local_settings
end

#local_windowObject (readonly)

Our window for receiving data. When we receive data, it reduces this window. If the window gets too small, we must send a window update.



73
74
75
# File 'lib/protocol/http2/connection.rb', line 73

def local_window
  @local_window
end

#remote_settingsObject

Returns the value of attribute remote_settings.



69
70
71
# File 'lib/protocol/http2/connection.rb', line 69

def remote_settings
  @remote_settings
end

#remote_windowObject (readonly)

Our window for sending data. When we send data, it reduces this window.



76
77
78
# File 'lib/protocol/http2/connection.rb', line 76

def remote_window
  @remote_window
end

#stateObject

Connection state (:new, :open, :closed).



65
66
67
# File 'lib/protocol/http2/connection.rb', line 65

def state
  @state
end

#streamsObject (readonly)

Returns the value of attribute streams.



113
114
115
# File 'lib/protocol/http2/connection.rb', line 113

def streams
  @streams
end

Instance Method Details

#accept_stream(stream_id) ⇒ Object

Accept a stream from the other side of the connnection.



278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/protocol/http2/connection.rb', line 278

def accept_stream(stream_id)
	unless valid_remote_stream_id?(stream_id)
		raise ProtocolError, "Invalid remote stream id: #{stream_id}"
	end
	
	if stream_id <= @remote_stream_id
		raise ProtocolError, "Invalid stream id: #{stream_id} <= #{@remote_stream_id}!"
	end
	
	@remote_stream_id = stream_id
	create_stream(stream_id)
end

#closeObject



90
91
92
# File 'lib/protocol/http2/connection.rb', line 90

def close
	@framer.close
end

#closed?Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/protocol/http2/connection.rb', line 78

def closed?
	@state == :closed
end

#connection_error!(error, message) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/protocol/http2/connection.rb', line 82

def connection_error!(error, message)
	return if @framer.closed?
	
	send_goaway(error, message)
	
	self.close
end

#create_stream(stream_id = next_stream_id) ⇒ Object

Create a stream on this side of the connection.



292
293
294
# File 'lib/protocol/http2/connection.rb', line 292

def create_stream(stream_id = next_stream_id)
	@streams[stream_id] = Stream.new(self, stream_id)
end

#decode_headers(data) ⇒ Object



100
101
102
# File 'lib/protocol/http2/connection.rb', line 100

def decode_headers(data)
	HPACK::Decompressor.new(data, @decoder).decode
end

#encode_headers(headers, buffer = String.new.b) ⇒ Object



94
95
96
97
98
# File 'lib/protocol/http2/connection.rb', line 94

def encode_headers(headers, buffer = String.new.b)
	HPACK::Compressor.new(buffer, @encoder).encode(headers)
	
	return buffer
end

#idObject



49
50
51
# File 'lib/protocol/http2/connection.rb', line 49

def id
	0
end

#maximum_concurrent_streamsObject



58
59
60
# File 'lib/protocol/http2/connection.rb', line 58

def maximum_concurrent_streams
	[@local_settings.maximum_concurrent_streams, @remote_settings.maximum_concurrent_streams].min
end

#maximum_frame_sizeObject

The size of a frame payload is limited by the maximum size that a receiver advertises in the SETTINGS_MAX_FRAME_SIZE setting.



54
55
56
# File 'lib/protocol/http2/connection.rb', line 54

def maximum_frame_size
	@remote_settings.maximum_frame_size
end

#next_stream_idObject

Streams are identified with an unsigned 31-bit integer. Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers. A stream identifier of zero (0x0) is used for connection control messages; the stream identifier of zero cannot be used to establish a new stream.



105
106
107
108
109
110
111
# File 'lib/protocol/http2/connection.rb', line 105

def next_stream_id
	id = @local_stream_id
	
	@local_stream_id += 2
	
	return id
end

#open!Object



230
231
232
233
234
# File 'lib/protocol/http2/connection.rb', line 230

def open!
	@state = :open
	
	return self
end

#process_settings(frame) ⇒ Boolean

In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the “open” or “half-closed (remote)” state). When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream flow-control windows that it maintains by the difference between the new value and the old value.

Returns:

  • (Boolean)

    whether the frame was an acknowledgement



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/protocol/http2/connection.rb', line 207

def process_settings(frame)
	if frame.acknowledgement?
		# The remote end has confirmed the settings have been received:
		changes = @local_settings.acknowledge
		
		update_local_settings(changes)
		
		return true
	else
		# The remote end is updating the settings, we reply with acknowledgement:
		reply = frame.acknowledge
		
		write_frame(reply)
		
		changes = frame.unpack
		@remote_settings.update(changes)
		
		update_remote_settings(changes)
		
		return false
	end
end

#read_frameObject



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/protocol/http2/connection.rb', line 115

def read_frame
	frame = @framer.read_frame(@local_settings.maximum_frame_size)
	# puts "#{self.class} #{@state} read_frame: class=#{frame.class} flags=#{frame.flags} length=#{frame.length}"
	# puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}"
	
	yield frame if block_given?
	
	frame.apply(self)
	
	return frame
rescue GoawayError
	# This is not a connection error. We are done.
	self.close
	
	raise
rescue ProtocolError => error
	connection_error!(error.code || PROTOCOL_ERROR, error.message)
	
	raise
rescue HPACK::CompressionError => error
	connection_error!(COMPRESSION_ERROR, error.message)
	
	raise
rescue
	connection_error!(PROTOCOL_ERROR, $!.message)
	
	raise
end

#receive_continuation(frame) ⇒ Object

Raises:



352
353
354
# File 'lib/protocol/http2/connection.rb', line 352

def receive_continuation(frame)
	raise ProtocolError, "Received unexpected continuation: #{frame.class}"
end

#receive_data(frame) ⇒ Object



263
264
265
266
267
268
269
270
271
# File 'lib/protocol/http2/connection.rb', line 263

def receive_data(frame)
	consume_local_window(frame)
	
	if stream = @streams[frame.stream_id]
		stream.receive_data(frame)
	else
		raise ProtocolError, "Cannot receive data for idle stream #{frame.stream_id}"
	end
end

#receive_frame(frame) ⇒ Object



356
357
358
# File 'lib/protocol/http2/connection.rb', line 356

def receive_frame(frame)
	# ignore.
end

#receive_goaway(frame) ⇒ Object



162
163
164
165
166
167
168
169
170
# File 'lib/protocol/http2/connection.rb', line 162

def receive_goaway(frame)
	@state = :closed
	
	last_stream_id, error_code, message = frame.unpack
	
	if error_code != 0
		raise GoawayError.new(message, error_code)
	end
end

#receive_headers(frame) ⇒ Object



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/protocol/http2/connection.rb', line 296

def receive_headers(frame)
	if frame.stream_id == 0
		raise ProtocolError, "Cannot receive headers for stream 0!"
	end
	
	if stream = @streams[frame.stream_id]
		stream.receive_headers(frame)
	else
		if @streams.count < self.maximum_concurrent_streams
			stream = accept_stream(frame.stream_id)
			stream.receive_headers(frame)
		else
			raise ProtocolError, "Exceeded maximum concurrent streams"
		end
	end
end

#receive_ping(frame) ⇒ Object



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/protocol/http2/connection.rb', line 247

def receive_ping(frame)
	if @state != :closed
		if frame.stream_id != 0
			raise ProtocolError, "Ping received for non-zero stream!"
		end
		
		unless frame.acknowledgement?
			reply = frame.acknowledge
			
			write_frame(reply)
		end
	else
		raise ProtocolError, "Cannot receive ping in state #{@state}"
	end
end

#receive_priority(frame) ⇒ Object



313
314
315
316
317
318
319
320
# File 'lib/protocol/http2/connection.rb', line 313

def receive_priority(frame)
	if stream = @streams[frame.stream_id]
		stream.receive_priority(frame)
	else
		stream = accept_stream(frame.stream_id)
		stream.receive_priority(frame)
	end
end

#receive_reset_stream(frame) ⇒ Object



322
323
324
325
326
327
328
# File 'lib/protocol/http2/connection.rb', line 322

def receive_reset_stream(frame)
	if stream = @streams[frame.stream_id]
		stream.receive_reset_stream(frame)
	else
		raise StreamClosed, "Cannot reset stream #{frame.stream_id}"
	end
end

#receive_settings(frame) ⇒ Object



236
237
238
239
240
241
242
243
244
245
# File 'lib/protocol/http2/connection.rb', line 236

def receive_settings(frame)
	if @state == :new
		# We transition to :open when we receive acknowledgement of first settings frame:
		open! if process_settings(frame)
	elsif @state != :closed
		process_settings(frame)
	else
		raise ProtocolError, "Cannot receive settings in state #{@state}"
	end
end

#receive_window_update(frame) ⇒ Object



330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/protocol/http2/connection.rb', line 330

def receive_window_update(frame)
	if frame.connection?
		super
	elsif stream = @streams[frame.stream_id]
		begin
			stream.receive_window_update(frame)
		rescue ProtocolError => error
			stream.send_reset_stream(error.code)
		end
	else
		# Receiving any frame other than HEADERS or PRIORITY on a stream in this state MUST be treated as a connection error of type PROTOCOL_ERROR.
		raise ProtocolError, "Cannot update window of idle stream #{frame.stream_id}"
	end
end

#send_goaway(error_code = 0, message = "") ⇒ Object



153
154
155
156
157
158
159
160
# File 'lib/protocol/http2/connection.rb', line 153

def send_goaway(error_code = 0, message = "")
	frame = GoawayFrame.new
	frame.pack @remote_stream_id, error_code, message
	
	write_frame(frame)
	
	@state = :closed
end

#send_ping(data) ⇒ Object



177
178
179
180
181
182
183
184
185
186
# File 'lib/protocol/http2/connection.rb', line 177

def send_ping(data)
	if @state != :closed
		frame = PingFrame.new
		frame.pack data
		
		write_frame(frame)
	else
		raise ProtocolError, "Cannot send ping in state #{@state}"
	end
end

#send_settings(changes) ⇒ Object



144
145
146
147
148
149
150
151
# File 'lib/protocol/http2/connection.rb', line 144

def send_settings(changes)
	@local_settings.append(changes)
	
	frame = SettingsFrame.new
	frame.pack(changes)
	
	write_frame(frame)
end

#update_local_settings(changes) ⇒ Object



188
189
190
191
192
193
194
# File 'lib/protocol/http2/connection.rb', line 188

def update_local_settings(changes)
	capacity = @local_settings.initial_window_size
	
	@streams.each_value do |stream|
		stream.local_window.capacity = capacity
	end
end

#update_remote_settings(changes) ⇒ Object



196
197
198
199
200
201
202
# File 'lib/protocol/http2/connection.rb', line 196

def update_remote_settings(changes)
	capacity = @remote_settings.initial_window_size
	
	@streams.each_value do |stream|
		stream.remote_window.capacity = capacity
	end
end

#valid_remote_stream_id?Boolean

Returns:

  • (Boolean)


273
274
275
# File 'lib/protocol/http2/connection.rb', line 273

def valid_remote_stream_id?
	false
end

#window_updatedObject



345
346
347
348
349
350
# File 'lib/protocol/http2/connection.rb', line 345

def window_updated
	# This is very inefficient, but workable.
	@streams.each_value do |stream|
		stream.window_updated unless stream.closed?
	end
end

#write_frame(frame) ⇒ Object



172
173
174
175
# File 'lib/protocol/http2/connection.rb', line 172

def write_frame(frame)
	# puts "#{self.class} #{@state} write_frame: class=#{frame.class} flags=#{frame.flags} length=#{frame.length}"
	@framer.write_frame(frame)
end