Class: HTTP::Protocol::HTTP2::Connection

Inherits:
Object
  • Object
show all
Includes:
FlowControl
Defined in:
lib/http/protocol/http2/connection.rb

Direct Known Subclasses

Client, Server

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from FlowControl

#available_frame_size, #consume_local_window, #consume_remote_window, #send_window_update

Constructor Details

#initialize(framer, local_stream_id) ⇒ Connection

Returns a new instance of Connection.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/http/protocol/http2/connection.rb', line 34

def initialize(framer, local_stream_id)
	@state = :new
	@streams = {}
	
	@framer = framer
	@local_stream_id = local_stream_id
	@remote_stream_id = 0
	
	@local_settings = PendingSettings.new
	@remote_settings = Settings.new
	
	@decoder = HPACK::Context.new
	@encoder = HPACK::Context.new
	
	@local_window = Window.new(@local_settings.initial_window_size)
	@remote_window = Window.new(@remote_settings.initial_window_size)
end

Instance Attribute Details

#framerObject (readonly)

Returns the value of attribute framer.



64
65
66
# File 'lib/http/protocol/http2/connection.rb', line 64

def framer
  @framer
end

#local_settingsObject

Current settings value for local and peer



70
71
72
# File 'lib/http/protocol/http2/connection.rb', line 70

def local_settings
  @local_settings
end

#local_windowObject (readonly)

Our window for receiving data. When we receive data, it reduces this window. If the window gets too small, we must send a window update.



75
76
77
# File 'lib/http/protocol/http2/connection.rb', line 75

def local_window
  @local_window
end

#remote_settingsObject

Returns the value of attribute remote_settings.



71
72
73
# File 'lib/http/protocol/http2/connection.rb', line 71

def remote_settings
  @remote_settings
end

#remote_windowObject (readonly)

Our window for sending data. When we send data, it reduces this window.



78
79
80
# File 'lib/http/protocol/http2/connection.rb', line 78

def remote_window
  @remote_window
end

#stateObject

Connection state (:new, :open, :closed).



67
68
69
# File 'lib/http/protocol/http2/connection.rb', line 67

def state
  @state
end

#streamsObject (readonly)

Returns the value of attribute streams.



109
110
111
# File 'lib/http/protocol/http2/connection.rb', line 109

def streams
  @streams
end

Instance Method Details

#closeObject



84
85
86
87
88
# File 'lib/http/protocol/http2/connection.rb', line 84

def close
	send_goaway
	
	@framer.close
end

#closed?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/http/protocol/http2/connection.rb', line 80

def closed?
	@state == :closed
end

#create_stream(stream_id = next_stream_id) ⇒ Object



258
259
260
# File 'lib/http/protocol/http2/connection.rb', line 258

def create_stream(stream_id = next_stream_id)
	Stream.new(self, stream_id)
end

#decode_headers(data) ⇒ Object



96
97
98
# File 'lib/http/protocol/http2/connection.rb', line 96

def decode_headers(data)
	HPACK::Decompressor.new(data, @decoder).decode
end

#deleted_stream?(frame) ⇒ Boolean

Returns:

  • (Boolean)


286
287
288
# File 'lib/http/protocol/http2/connection.rb', line 286

def deleted_stream? frame
	frame.stream_id <= @local_stream_id or frame.stream_id <= @remote_stream_id
end

#encode_headers(headers, buffer = String.new.b) ⇒ Object



90
91
92
93
94
# File 'lib/http/protocol/http2/connection.rb', line 90

def encode_headers(headers, buffer = String.new.b)
	HPACK::Compressor.new(buffer, @encoder).encode(headers)
	
	return buffer
end

#idObject



52
53
54
# File 'lib/http/protocol/http2/connection.rb', line 52

def id
	0
end

#maximum_concurrent_streamsObject



60
61
62
# File 'lib/http/protocol/http2/connection.rb', line 60

def maximum_concurrent_streams
	[@local_settings.maximum_concurrent_streams, @remote_settings.maximum_concurrent_streams].min
end

#maximum_frame_sizeObject



56
57
58
# File 'lib/http/protocol/http2/connection.rb', line 56

def maximum_frame_size
	@remote_settings.maximum_frame_size
end

#next_stream_idObject

Streams are identified with an unsigned 31-bit integer. Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers. A stream identifier of zero (0x0) is used for connection control messages; the stream identifier of zero cannot be used to establish a new stream.



101
102
103
104
105
106
107
# File 'lib/http/protocol/http2/connection.rb', line 101

def next_stream_id
	id = @local_stream_id
	
	@local_stream_id += 2
	
	return id
end

#open!Object



215
216
217
218
219
# File 'lib/http/protocol/http2/connection.rb', line 215

def open!
	@state = :open
	
	return self
end

#process_settings(frame) ⇒ Boolean

In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the “open” or “half-closed (remote)” state). When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream flow-control windows that it maintains by the difference between the new value and the old value.

Returns:

  • (Boolean)

    whether the frame was an acknowledgement



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/http/protocol/http2/connection.rb', line 192

def process_settings(frame)
	if frame.acknowledgement?
		# The remote end has confirmed the settings have been received:
		changes = @local_settings.acknowledge
		
		update_local_settings(changes)
		
		return true
	else
		# The remote end is updating the settings, we reply with acknowledgement:
		reply = frame.acknowledge
		
		write_frame(reply)
		
		changes = frame.unpack
		@remote_settings.update(changes)
		
		update_remote_settings(changes)
		
		return false
	end
end

#read_frameObject



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/http/protocol/http2/connection.rb', line 111

def read_frame
	frame = @framer.read_frame(@local_settings.maximum_frame_size)
	# puts "#{self.class} #{@state} read_frame: class=#{frame.class} flags=#{frame.flags} length=#{frame.length}"
	# puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}"
	
	yield frame if block_given?
	
	frame.apply(self)
	
	return frame
rescue ProtocolError => error
	send_goaway(error.code || PROTOCOL_ERROR, error.message)
	
	raise
rescue HTTP::HPACK::CompressionError => error
	send_goaway(COMPRESSION_ERROR, error.message)
	
	raise
rescue
	send_goaway(PROTOCOL_ERROR, $!.message)
	
	raise
end

#receive_data(frame) ⇒ Object



244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/http/protocol/http2/connection.rb', line 244

def receive_data(frame)
	consume_local_window(frame)
	
	if stream = @streams[frame.stream_id]
		stream.receive_data(frame)
		
		if stream.closed?
			@streams.delete(stream.id)
		end
	else
		raise ProtocolError, "Bad stream"
	end
end

#receive_frame(frame) ⇒ Object



334
335
336
# File 'lib/http/protocol/http2/connection.rb', line 334

def receive_frame(frame)
	warn "Unhandled frame #{frame.inspect}"
end

#receive_goaway(frame) ⇒ Object



153
154
155
# File 'lib/http/protocol/http2/connection.rb', line 153

def receive_goaway(frame)
	@state = :closed
end

#receive_headers(frame) ⇒ Object



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/http/protocol/http2/connection.rb', line 262

def receive_headers(frame)
	if frame.stream_id == 0
		raise ProtocolError, "Cannot receive headers for stream 0!"
	end
	
	if stream = @streams[frame.stream_id]
		stream.receive_headers(frame)
		
		if stream.closed?
			@streams.delete(stream.id)
		end
	elsif frame.stream_id > @remote_stream_id
		if @streams.count < self.maximum_concurrent_streams
			stream = create_stream(frame.stream_id)
			stream.receive_headers(frame)
			
			@remote_stream_id = stream.id
			@streams[stream.id] = stream
		else
			raise ProtocolError, "Exceeded maximum concurrent streams"
		end
	end
end

#receive_ping(frame) ⇒ Object



232
233
234
235
236
237
238
239
240
241
242
# File 'lib/http/protocol/http2/connection.rb', line 232

def receive_ping(frame)
	if @state != :closed
		unless frame.acknowledgement?
			reply = frame.acknowledge
			
			write_frame(reply)
		end
	else
		raise ProtocolError, "Cannot receive ping in state #{@state}"
	end
end

#receive_priority(frame) ⇒ Object



290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/http/protocol/http2/connection.rb', line 290

def receive_priority(frame)
	if stream = @streams[frame.stream_id]
		stream.receive_priority(frame)
	elsif deleted_stream? frame
		# ignore
	else
		stream = create_stream(frame.stream_id)
		stream.receive_priority(frame)
		
		@streams[frame.stream_id] = stream
	end
end

#receive_reset_stream(frame) ⇒ Object



303
304
305
306
307
308
309
310
311
312
313
# File 'lib/http/protocol/http2/connection.rb', line 303

def receive_reset_stream(frame)
	if stream = @streams[frame.stream_id]
		stream.receive_reset_stream(frame)
		
		@streams.delete(stream.id)
	elsif deleted_stream? frame
		# ignore
	else
		raise ProtocolError, "Bad stream"
	end
end

#receive_settings(frame) ⇒ Object



221
222
223
224
225
226
227
228
229
230
# File 'lib/http/protocol/http2/connection.rb', line 221

def receive_settings(frame)
	if @state == :new
		# We transition to :open when we receive acknowledgement of first settings frame:
		open! if process_settings(frame)
	elsif @state != :closed
		process_settings(frame)
	else
		raise ProtocolError, "Cannot receive settings in state #{@state}"
	end
end

#receive_window_update(frame) ⇒ Object



315
316
317
318
319
320
321
322
323
324
325
# File 'lib/http/protocol/http2/connection.rb', line 315

def receive_window_update(frame)
	if frame.connection?
		super
	elsif stream = @streams[frame.stream_id]
		stream.receive_window_update(frame)
	elsif deleted_stream? frame
		# ignore
	else
		raise ProtocolError, "Cannot update window of non-existant stream: #{frame.stream_id}"
	end
end

#send_goaway(error_code = 0, message = "") ⇒ Object



144
145
146
147
148
149
150
151
# File 'lib/http/protocol/http2/connection.rb', line 144

def send_goaway(error_code = 0, message = "")
	frame = GoawayFrame.new
	frame.pack @remote_stream_id, error_code, message
	
	write_frame(frame)
	
	@state = :closed
end

#send_ping(data) ⇒ Object



162
163
164
165
166
167
168
169
170
171
# File 'lib/http/protocol/http2/connection.rb', line 162

def send_ping(data)
	if @state != :closed
		frame = PingFrame.new
		frame.pack data
		
		write_frame(frame)
	else
		raise ProtocolError, "Cannot send ping in state #{@state}"
	end
end

#send_settings(changes) ⇒ Object



135
136
137
138
139
140
141
142
# File 'lib/http/protocol/http2/connection.rb', line 135

def send_settings(changes)
	@local_settings.append(changes)
	
	frame = SettingsFrame.new
	frame.pack(changes)
	
	write_frame(frame)
end

#update_local_settings(changes) ⇒ Object



173
174
175
176
177
178
179
# File 'lib/http/protocol/http2/connection.rb', line 173

def update_local_settings(changes)
	capacity = @local_settings.initial_window_size
	
	@streams.each_value do |stream|
		stream.local_window.capacity = capacity
	end
end

#update_remote_settings(changes) ⇒ Object



181
182
183
184
185
186
187
# File 'lib/http/protocol/http2/connection.rb', line 181

def update_remote_settings(changes)
	capacity = @remote_settings.initial_window_size
	
	@streams.each_value do |stream|
		stream.remote_window.capacity = capacity
	end
end

#window_updatedObject



327
328
329
330
331
332
# File 'lib/http/protocol/http2/connection.rb', line 327

def window_updated
	# This is very inefficient, but workable.
	@streams.each_value do |stream|
		stream.window_updated unless stream.closed?
	end
end

#write_frame(frame) ⇒ Object



157
158
159
160
# File 'lib/http/protocol/http2/connection.rb', line 157

def write_frame(frame)
	# puts "#{self.class} #{@state} write_frame: class=#{frame.class} flags=#{frame.flags} length=#{frame.length}"
	@framer.write_frame(frame)
end