Class: Async::HTTP::Protocol::HTTP2::Stream::Output

Inherits:
Object
  • Object
show all
Defined in:
lib/async/http/protocol/http2/stream.rb

Instance Method Summary collapse

Constructor Details

#initialize(stream, body, task: Task.current) ⇒ Output

Returns a new instance of Output.



47
48
49
50
51
52
53
54
55
56
# File 'lib/async/http/protocol/http2/stream.rb', line 47

def initialize(stream, body, task: Task.current)
	@stream = stream
	
	@body = body
	@remainder = nil
	
	@window_updated = Async::Condition.new
	
	@task = task.async(&self.method(:passthrough))
end

Instance Method Details

#close(error) ⇒ Object



119
120
121
122
123
124
125
126
# File 'lib/async/http/protocol/http2/stream.rb', line 119

def close(error)
	if @body
		@body.close(error)
		@body = nil
	end
	
	@task&.stop
end

#end_streamObject



111
112
113
# File 'lib/async/http/protocol/http2/stream.rb', line 111

def end_stream
	@stream.send_data(nil, ::Protocol::HTTP2::END_STREAM)
end

#passthrough(task) ⇒ Object

Reads chunks from the given body and writes them to the stream as fast as possible.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/async/http/protocol/http2/stream.rb', line 59

def passthrough(task)
	while chunk = self.read
		maximum_size = @stream.available_frame_size
		
		while maximum_size <= 0
			@window_updated.wait
			
			maximum_size = @stream.available_frame_size
		end
		
		self.send_data(chunk, maximum_size)
	end
	
	self.end_stream
rescue Async::Stop
	# Ignore.
ensure
	@body&.close($!)
	@body = nil
end

#push(chunk) ⇒ Object



91
92
93
# File 'lib/async/http/protocol/http2/stream.rb', line 91

def push(chunk)
	@remainder = chunk
end

#readObject



80
81
82
83
84
85
86
87
88
89
# File 'lib/async/http/protocol/http2/stream.rb', line 80

def read
	if @remainder
		remainder = @remainder
		@remainder = nil
		
		return remainder
	else
		@body&.read
	end
end

#send_data(chunk, maximum_size) ⇒ Object

Send ‘maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk.

Parameters:

  • maximum_size (Integer)

    send up to this many bytes of data.

  • stream (Stream)

    the stream to use for sending data frames.



98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/async/http/protocol/http2/stream.rb', line 98

def send_data(chunk, maximum_size)
	if chunk.bytesize <= maximum_size
		@stream.send_data(chunk, maximum_size: maximum_size)
	else
		@stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size)
		
		# The window was not big enough to send all the data, so we save it for next time:
		self.push(
			chunk.byteslice(maximum_size, chunk.bytesize - maximum_size)
		)
	end
end

#window_updated(size) ⇒ Object



115
116
117
# File 'lib/async/http/protocol/http2/stream.rb', line 115

def window_updated(size)
	@window_updated.signal
end