Class: Protocol::HTTP::Body::Streamable::Body

Inherits:
Readable
  • Object
show all
Defined in:
lib/protocol/http/body/streamable.rb

Overview

A streaming body that can be read from and written to.

Direct Known Subclasses

RequestBody, ResponseBody

Instance Method Summary collapse

Methods inherited from Readable

#as_json, #buffered, #close, #discard, #each, #empty?, #finish, #join, #length, #ready?, #rewind, #rewindable?, #to_json

Constructor Details

#initialize(block, input = nil) ⇒ Body

Initialize the body with the given block and input.



90
91
92
93
94
# File 'lib/protocol/http/body/streamable.rb', line 90

def initialize(block, input = nil)
	@block = block
	@input = input
	@output = nil
end

Instance Method Details

#call(stream) ⇒ Object

Invoke the block with the given stream. The block can read and write to the stream, and must close the stream when finishing.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/protocol/http/body/streamable.rb', line 119

def call(stream)
	if @block.nil?
		raise ConsumedError, "Streaming block has already been consumed!"
	end
	
	block = @block
	
	@input = @output = @block = nil
	
	# Ownership of the stream is passed into the block, in other words, the block is responsible for closing the stream.
	block.call(stream)
rescue => error
	# If, for some reason, the block raises an error, we assume it may not have closed the stream, so we close it here:
	stream.close
	raise
end

#close_input(error = nil) ⇒ Object

Close the input. The streaming body will eventually read all the input.



139
140
141
142
143
144
# File 'lib/protocol/http/body/streamable.rb', line 139

def close_input(error = nil)
	if input = @input
		@input = nil
		input.close(error)
	end
end

#close_output(error = nil) ⇒ Object

Close the output, the streaming body will be unable to write any more output.



149
150
151
# File 'lib/protocol/http/body/streamable.rb', line 149

def close_output(error = nil)
	@output&.close(error)
end

#readObject

Invokes the block in a fiber which yields chunks when they are available.



102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/protocol/http/body/streamable.rb', line 102

def read
	# We are reading chunk by chunk, allocate an output stream and execute the block to generate the chunks:
	if @output.nil?
		if @block.nil?
			raise ConsumedError, "Streaming body has already been consumed!"
		end
		
		@output = Output.schedule(@input, @block)
		@block = nil
	end
	
	@output.read
end

#stream?Boolean

Returns:

  • (Boolean)


97
98
99
# File 'lib/protocol/http/body/streamable.rb', line 97

def stream?
	true
end