Class: Protocol::HTTP2::Connection

Inherits:
Object
  • Object
show all
Includes:
FlowControl
Defined in:
lib/protocol/http2/connection.rb

Direct Known Subclasses

Client, Server

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from FlowControl

#available_frame_size, #consume_local_window, #consume_remote_window, #send_window_update

Constructor Details

#initialize(framer, local_stream_id) ⇒ Connection

Returns a new instance of Connection.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/protocol/http2/connection.rb', line 31

def initialize(framer, local_stream_id)
	@state = :new
	@streams = {}
	
	@framer = framer
	@local_stream_id = local_stream_id
	@remote_stream_id = 0
	
	@local_settings = PendingSettings.new
	@remote_settings = Settings.new
	
	@decoder = HPACK::Context.new
	@encoder = HPACK::Context.new
	
	@local_window = Window.new(@local_settings.initial_window_size)
	@remote_window = Window.new(@remote_settings.initial_window_size)
end

Instance Attribute Details

#framerObject (readonly)

Returns the value of attribute framer.



61
62
63
# File 'lib/protocol/http2/connection.rb', line 61

def framer
  @framer
end

#local_settingsObject

Current settings value for local and peer



67
68
69
# File 'lib/protocol/http2/connection.rb', line 67

def local_settings
  @local_settings
end

#local_windowObject (readonly)

Our window for receiving data. When we receive data, it reduces this window. If the window gets too small, we must send a window update.



72
73
74
# File 'lib/protocol/http2/connection.rb', line 72

def local_window
  @local_window
end

#remote_settingsObject

Returns the value of attribute remote_settings.



68
69
70
# File 'lib/protocol/http2/connection.rb', line 68

def remote_settings
  @remote_settings
end

#remote_windowObject (readonly)

Our window for sending data. When we send data, it reduces this window.



75
76
77
# File 'lib/protocol/http2/connection.rb', line 75

def remote_window
  @remote_window
end

#stateObject

Connection state (:new, :open, :closed).



64
65
66
# File 'lib/protocol/http2/connection.rb', line 64

def state
  @state
end

#streamsObject (readonly)

Returns the value of attribute streams.



106
107
108
# File 'lib/protocol/http2/connection.rb', line 106

def streams
  @streams
end

Instance Method Details

#closeObject



81
82
83
84
85
# File 'lib/protocol/http2/connection.rb', line 81

def close
	send_goaway
	
	@framer.close
end

#closed?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/protocol/http2/connection.rb', line 77

def closed?
	@state == :closed
end

#create_stream(stream_id = next_stream_id) ⇒ Object



261
262
263
# File 'lib/protocol/http2/connection.rb', line 261

def create_stream(stream_id = next_stream_id)
	Stream.new(self, stream_id)
end

#decode_headers(data) ⇒ Object



93
94
95
# File 'lib/protocol/http2/connection.rb', line 93

def decode_headers(data)
	HPACK::Decompressor.new(data, @decoder).decode
end

#deleted_stream?(frame) ⇒ Boolean

Returns:

  • (Boolean)


289
290
291
# File 'lib/protocol/http2/connection.rb', line 289

def deleted_stream? frame
	frame.stream_id <= @local_stream_id or frame.stream_id <= @remote_stream_id
end

#encode_headers(headers, buffer = String.new.b) ⇒ Object



87
88
89
90
91
# File 'lib/protocol/http2/connection.rb', line 87

def encode_headers(headers, buffer = String.new.b)
	HPACK::Compressor.new(buffer, @encoder).encode(headers)
	
	return buffer
end

#idObject



49
50
51
# File 'lib/protocol/http2/connection.rb', line 49

def id
	0
end

#maximum_concurrent_streamsObject



57
58
59
# File 'lib/protocol/http2/connection.rb', line 57

def maximum_concurrent_streams
	[@local_settings.maximum_concurrent_streams, @remote_settings.maximum_concurrent_streams].min
end

#maximum_frame_sizeObject



53
54
55
# File 'lib/protocol/http2/connection.rb', line 53

def maximum_frame_size
	@remote_settings.maximum_frame_size
end

#next_stream_idObject

Streams are identified with an unsigned 31-bit integer. Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers. A stream identifier of zero (0x0) is used for connection control messages; the stream identifier of zero cannot be used to establish a new stream.



98
99
100
101
102
103
104
# File 'lib/protocol/http2/connection.rb', line 98

def next_stream_id
	id = @local_stream_id
	
	@local_stream_id += 2
	
	return id
end

#open!Object



218
219
220
221
222
# File 'lib/protocol/http2/connection.rb', line 218

def open!
	@state = :open
	
	return self
end

#process_settings(frame) ⇒ Boolean

In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the “open” or “half-closed (remote)” state). When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream flow-control windows that it maintains by the difference between the new value and the old value.

Returns:

  • (Boolean)

    whether the frame was an acknowledgement



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/protocol/http2/connection.rb', line 195

def process_settings(frame)
	if frame.acknowledgement?
		# The remote end has confirmed the settings have been received:
		changes = @local_settings.acknowledge
		
		update_local_settings(changes)
		
		return true
	else
		# The remote end is updating the settings, we reply with acknowledgement:
		reply = frame.acknowledge
		
		write_frame(reply)
		
		changes = frame.unpack
		@remote_settings.update(changes)
		
		update_remote_settings(changes)
		
		return false
	end
end

#read_frameObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/protocol/http2/connection.rb', line 108

def read_frame
	frame = @framer.read_frame(@local_settings.maximum_frame_size)
	# puts "#{self.class} #{@state} read_frame: class=#{frame.class} flags=#{frame.flags} length=#{frame.length}"
	# puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}"
	
	yield frame if block_given?
	
	frame.apply(self)
	
	return frame
rescue ProtocolError => error
	send_goaway(error.code || PROTOCOL_ERROR, error.message)
	
	raise
rescue HPACK::CompressionError => error
	send_goaway(COMPRESSION_ERROR, error.message)
	
	raise
rescue
	send_goaway(PROTOCOL_ERROR, $!.message)
	
	raise
end

#receive_data(frame) ⇒ Object



247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/protocol/http2/connection.rb', line 247

def receive_data(frame)
	consume_local_window(frame)
	
	if stream = @streams[frame.stream_id]
		stream.receive_data(frame)
		
		if stream.closed?
			@streams.delete(stream.id)
		end
	else
		raise ProtocolError, "Bad stream"
	end
end

#receive_frame(frame) ⇒ Object



337
338
339
# File 'lib/protocol/http2/connection.rb', line 337

def receive_frame(frame)
	warn "Unhandled frame #{frame.inspect}"
end

#receive_goaway(frame) ⇒ Object



150
151
152
153
154
155
156
157
158
# File 'lib/protocol/http2/connection.rb', line 150

def receive_goaway(frame)
	@state = :closed
	
	last_stream_id, error_code, message = frame.unpack
	
	if error_code != 0
		raise GoawayError.new message, error_code
	end
end

#receive_headers(frame) ⇒ Object



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/protocol/http2/connection.rb', line 265

def receive_headers(frame)
	if frame.stream_id == 0
		raise ProtocolError, "Cannot receive headers for stream 0!"
	end
	
	if stream = @streams[frame.stream_id]
		stream.receive_headers(frame)
		
		if stream.closed?
			@streams.delete(stream.id)
		end
	elsif frame.stream_id > @remote_stream_id
		if @streams.count < self.maximum_concurrent_streams
			stream = create_stream(frame.stream_id)
			stream.receive_headers(frame)
			
			@remote_stream_id = stream.id
			@streams[stream.id] = stream
		else
			raise ProtocolError, "Exceeded maximum concurrent streams"
		end
	end
end

#receive_ping(frame) ⇒ Object



235
236
237
238
239
240
241
242
243
244
245
# File 'lib/protocol/http2/connection.rb', line 235

def receive_ping(frame)
	if @state != :closed
		unless frame.acknowledgement?
			reply = frame.acknowledge
			
			write_frame(reply)
		end
	else
		raise ProtocolError, "Cannot receive ping in state #{@state}"
	end
end

#receive_priority(frame) ⇒ Object



293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/protocol/http2/connection.rb', line 293

def receive_priority(frame)
	if stream = @streams[frame.stream_id]
		stream.receive_priority(frame)
	elsif deleted_stream? frame
		# ignore
	else
		stream = create_stream(frame.stream_id)
		stream.receive_priority(frame)
		
		@streams[frame.stream_id] = stream
	end
end

#receive_reset_stream(frame) ⇒ Object



306
307
308
309
310
311
312
313
314
315
316
# File 'lib/protocol/http2/connection.rb', line 306

def receive_reset_stream(frame)
	if stream = @streams[frame.stream_id]
		stream.receive_reset_stream(frame)
		
		@streams.delete(stream.id)
	elsif deleted_stream? frame
		# ignore
	else
		raise ProtocolError, "Bad stream"
	end
end

#receive_settings(frame) ⇒ Object



224
225
226
227
228
229
230
231
232
233
# File 'lib/protocol/http2/connection.rb', line 224

def receive_settings(frame)
	if @state == :new
		# We transition to :open when we receive acknowledgement of first settings frame:
		open! if process_settings(frame)
	elsif @state != :closed
		process_settings(frame)
	else
		raise ProtocolError, "Cannot receive settings in state #{@state}"
	end
end

#receive_window_update(frame) ⇒ Object



318
319
320
321
322
323
324
325
326
327
328
# File 'lib/protocol/http2/connection.rb', line 318

def receive_window_update(frame)
	if frame.connection?
		super
	elsif stream = @streams[frame.stream_id]
		stream.receive_window_update(frame)
	elsif deleted_stream? frame
		# ignore
	else
		raise ProtocolError, "Cannot update window of non-existant stream: #{frame.stream_id}"
	end
end

#send_goaway(error_code = 0, message = "") ⇒ Object



141
142
143
144
145
146
147
148
# File 'lib/protocol/http2/connection.rb', line 141

def send_goaway(error_code = 0, message = "")
	frame = GoawayFrame.new
	frame.pack @remote_stream_id, error_code, message
	
	write_frame(frame)
	
	@state = :closed
end

#send_ping(data) ⇒ Object



165
166
167
168
169
170
171
172
173
174
# File 'lib/protocol/http2/connection.rb', line 165

def send_ping(data)
	if @state != :closed
		frame = PingFrame.new
		frame.pack data
		
		write_frame(frame)
	else
		raise ProtocolError, "Cannot send ping in state #{@state}"
	end
end

#send_settings(changes) ⇒ Object



132
133
134
135
136
137
138
139
# File 'lib/protocol/http2/connection.rb', line 132

def send_settings(changes)
	@local_settings.append(changes)
	
	frame = SettingsFrame.new
	frame.pack(changes)
	
	write_frame(frame)
end

#update_local_settings(changes) ⇒ Object



176
177
178
179
180
181
182
# File 'lib/protocol/http2/connection.rb', line 176

def update_local_settings(changes)
	capacity = @local_settings.initial_window_size
	
	@streams.each_value do |stream|
		stream.local_window.capacity = capacity
	end
end

#update_remote_settings(changes) ⇒ Object



184
185
186
187
188
189
190
# File 'lib/protocol/http2/connection.rb', line 184

def update_remote_settings(changes)
	capacity = @remote_settings.initial_window_size
	
	@streams.each_value do |stream|
		stream.remote_window.capacity = capacity
	end
end

#window_updatedObject



330
331
332
333
334
335
# File 'lib/protocol/http2/connection.rb', line 330

def window_updated
	# This is very inefficient, but workable.
	@streams.each_value do |stream|
		stream.window_updated unless stream.closed?
	end
end

#write_frame(frame) ⇒ Object



160
161
162
163
# File 'lib/protocol/http2/connection.rb', line 160

def write_frame(frame)
	# puts "#{self.class} #{@state} write_frame: class=#{frame.class} flags=#{frame.flags} length=#{frame.length}"
	@framer.write_frame(frame)
end