Class: Async::HTTP::Protocol::HTTP2::Stream::Buffer

Inherits:
Object
  • Object
show all
Defined in:
lib/async/http/protocol/http2/stream.rb

Instance Method Summary collapse

Constructor Details

#initialize(stream, body, task: Task.current) ⇒ Buffer

Returns a new instance of Buffer.



29
30
31
32
33
34
35
36
37
38
# File 'lib/async/http/protocol/http2/stream.rb', line 29

def initialize(stream, body, task: Task.current)
	@stream = stream
	
	@body = body
	@remainder = nil
	
	@window_updated = Async::Condition.new
	
	@task = task.async(&self.method(:passthrough))
end

Instance Method Details

#close(error) ⇒ Object



100
101
102
103
104
105
106
107
# File 'lib/async/http/protocol/http2/stream.rb', line 100

def close(error)
	if @body
		@body.close(error)
		@body = nil
	end
	
	@task&.stop
end

#end_streamObject



92
93
94
# File 'lib/async/http/protocol/http2/stream.rb', line 92

def end_stream
	@stream.send_data(nil, ::Protocol::HTTP2::END_STREAM)
end

#passthrough(task) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/async/http/protocol/http2/stream.rb', line 40

def passthrough(task)
	while chunk = self.read
		maximum_size = @stream.available_frame_size
		
		while maximum_size <= 0
			@window_updated.wait
			
			maximum_size = @stream.available_frame_size
		end
		
		self.send_data(chunk, maximum_size)
	end
	
	self.end_stream
rescue Async::Stop
	# Ignore.
ensure
	@body&.close($!)
	@body = nil
end

#push(chunk) ⇒ Object



72
73
74
# File 'lib/async/http/protocol/http2/stream.rb', line 72

def push(chunk)
	@remainder = chunk
end

#readObject



61
62
63
64
65
66
67
68
69
70
# File 'lib/async/http/protocol/http2/stream.rb', line 61

def read
	if @remainder
		remainder = @remainder
		@remainder = nil
		
		return remainder
	else
		@body&.read
	end
end

#send_data(chunk, maximum_size) ⇒ Object

Send ‘maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk.

Parameters:

  • maximum_size (Integer)

    send up to this many bytes of data.

  • stream (Stream)

    the stream to use for sending data frames.



79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/async/http/protocol/http2/stream.rb', line 79

def send_data(chunk, maximum_size)
	if chunk.bytesize <= maximum_size
		@stream.send_data(chunk, maximum_size: maximum_size)
	else
		@stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size)
		
		# The window was not big enough to send all the data, so we save it for next time:
		self.push(
			chunk.byteslice(maximum_size, chunk.bytesize - maximum_size)
		)
	end
end

#window_updated(size) ⇒ Object



96
97
98
# File 'lib/async/http/protocol/http2/stream.rb', line 96

def window_updated(size)
	@window_updated.signal
end