Class: Async::HTTP::Protocol::HTTP2::Stream

Inherits:
Protocol::HTTP2::Stream
  • Object
show all
Defined in:
lib/async/http/protocol/http2/stream.rb

Direct Known Subclasses

Request::Stream, Response::Stream

Defined Under Namespace

Classes: Input, Output

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeStream

Returns a new instance of Stream.



163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/async/http/protocol/http2/stream.rb', line 163

def initialize(*)
	super
	
	@headers = nil
	@trailers = nil
	
	# Input buffer, reading request body, or response body (receive_data):
	@length = nil
	@input = nil
	
	# Output buffer, writing request body or response body (window_updated):
	@output = nil
end

Instance Attribute Details

#headersObject

Returns the value of attribute headers.



177
178
179
# File 'lib/async/http/protocol/http2/stream.rb', line 177

def headers
  @headers
end

#inputObject (readonly)

Returns the value of attribute input.



179
180
181
# File 'lib/async/http/protocol/http2/stream.rb', line 179

def input
  @input
end

Instance Method Details

#add_header(key, value) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
# File 'lib/async/http/protocol/http2/stream.rb', line 181

def add_header(key, value)
	if key == CONNECTION
		raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!"
	elsif key.start_with? ':'
		raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!"
	elsif key =~ /[A-Z]/
		raise ::Protocol::HTTP2::HeaderError, "Invalid upper-case characters in header #{key}!"
	else
		@headers.add(key, value)
	end
end

#add_trailer(key, value) ⇒ Object



193
194
195
196
197
198
199
# File 'lib/async/http/protocol/http2/stream.rb', line 193

def add_trailer(key, value)
	if @trailers.include(key)
		add_header(key, value)
	else
		raise ::Protocol::HTTP2::HeaderError, "Cannot add trailer #{key} as it was not specified in trailers!"
	end
end

#close(error = nil) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/async/http/protocol/http2/stream.rb', line 276

def close(error = nil)
	super
	
	if @input
		@input.close(error)
		@input = nil
	end
	
	if @output
		@output.stop(error)
		@output = nil
	end
end

#prepare_input(length) ⇒ Input

Prepare the input stream which will be used for incoming data frames.

Returns:

  • (Input)

    the input body.



229
230
231
232
233
234
235
# File 'lib/async/http/protocol/http2/stream.rb', line 229

def prepare_input(length)
	if @input.nil?
		@input = Input.new(self, length)
	else
		raise ArgumentError, "Input body already prepared!"
	end
end

#process_data(frame) ⇒ Object



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/async/http/protocol/http2/stream.rb', line 244

def process_data(frame)
	data = frame.unpack
	
	if @input
		unless data.empty?
			@input.write(data)
		end
		
		if frame.end_stream?
			@input.close
			@input = nil
		end
	end
	
	return data
rescue ::Protocol::HTTP2::ProtocolError
	raise
rescue # Anything else...
	send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR)
end

#process_headers(frame) ⇒ Object



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/async/http/protocol/http2/stream.rb', line 207

def process_headers(frame)
	if @headers.nil?
		@headers = ::Protocol::HTTP::Headers.new
		self.receive_initial_headers(super, frame.end_stream?)
		@trailers = @headers[TRAILERS]
	elsif @trailers and frame.end_stream?
		self.receive_trailing_headers(super, frame.end_stream?)
	else
		raise ::Protocol::HTTP2::HeaderError, "Unable to process headers!"
	end
rescue ::Protocol::HTTP2::HeaderError => error
	Async.logger.error(self, error)
	
	send_reset_stream(error.code)
end

#receive_trailing_headers(headers, end_stream) ⇒ Object



201
202
203
204
205
# File 'lib/async/http/protocol/http2/stream.rb', line 201

def receive_trailing_headers(headers, end_stream)
	headers.each do |key, value|
		add_trailer(key, value)
	end
end

#send_body(body) ⇒ Object

Set the body and begin sending it.



266
267
268
# File 'lib/async/http/protocol/http2/stream.rb', line 266

def send_body(body)
	@output = Output.for(self, body)
end

#update_local_window(frame) ⇒ Object



237
238
239
240
241
242
# File 'lib/async/http/protocol/http2/stream.rb', line 237

def update_local_window(frame)
	consume_local_window(frame)
	
	# This is done on demand in `Input#read`:
	# request_window_update
end

#wait_for_inputObject



223
224
225
# File 'lib/async/http/protocol/http2/stream.rb', line 223

def wait_for_input
	return @input
end

#window_updated(size) ⇒ Object



270
271
272
273
274
# File 'lib/async/http/protocol/http2/stream.rb', line 270

def window_updated(size)
	super
	
	@output&.window_updated(size)
end