Class: Protocol::HTTP::Body::Streamable::Body
- Defined in:
- lib/protocol/http/body/streamable.rb
Overview
A streaming body that can be read from and written to.
Direct Known Subclasses
Instance Method Summary collapse
-
#call(stream) ⇒ Object
Invoke the block with the given stream.
-
#close_input(error = nil) ⇒ Object
Close the input.
-
#close_output(error = nil) ⇒ Object
Close the output, the streaming body will be unable to write any more output.
-
#initialize(block, input = nil) ⇒ Body
constructor
Initialize the body with the given block and input.
-
#read ⇒ Object
Invokes the block in a fiber which yields chunks when they are available.
- #stream? ⇒ Boolean
Methods inherited from Readable
#as_json, #buffered, #close, #discard, #each, #empty?, #finish, #join, #length, #ready?, #rewind, #rewindable?, #to_json
Constructor Details
#initialize(block, input = nil) ⇒ Body
Initialize the body with the given block and input.
90 91 92 93 94 |
# File 'lib/protocol/http/body/streamable.rb', line 90 def initialize(block, input = nil) @block = block @input = input @output = nil end |
Instance Method Details
#call(stream) ⇒ Object
Invoke the block with the given stream. The block can read and write to the stream, and must close the stream when finishing.
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/protocol/http/body/streamable.rb', line 119 def call(stream) if @block.nil? raise ConsumedError, "Streaming block has already been consumed!" end block = @block @input = @output = @block = nil # Ownership of the stream is passed into the block, in other words, the block is responsible for closing the stream. block.call(stream) rescue => error # If, for some reason, the block raises an error, we assume it may not have closed the stream, so we close it here: stream.close raise end |
#close_input(error = nil) ⇒ Object
Close the input. The streaming body will eventually read all the input.
139 140 141 142 143 144 |
# File 'lib/protocol/http/body/streamable.rb', line 139 def close_input(error = nil) if input = @input @input = nil input.close(error) end end |
#close_output(error = nil) ⇒ Object
Close the output, the streaming body will be unable to write any more output.
149 150 151 |
# File 'lib/protocol/http/body/streamable.rb', line 149 def close_output(error = nil) @output&.close(error) end |
#read ⇒ Object
Invokes the block in a fiber which yields chunks when they are available.
102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/protocol/http/body/streamable.rb', line 102 def read # We are reading chunk by chunk, allocate an output stream and execute the block to generate the chunks: if @output.nil? if @block.nil? raise ConsumedError, "Streaming body has already been consumed!" end @output = Output.schedule(@input, @block) @block = nil end @output.read end |
#stream? ⇒ Boolean
97 98 99 |
# File 'lib/protocol/http/body/streamable.rb', line 97 def stream? true end |