Class: Itsi::GrpcCall
- Inherits:
-
Object
- Object
- Itsi::GrpcCall
- Defined in:
- lib/itsi/server/grpc/grpc_call.rb
Instance Attribute Summary collapse
-
#rpc_desc ⇒ Object
Returns the value of attribute rpc_desc.
Instance Method Summary collapse
- #bidi_streamer? ⇒ Boolean
- #client_streamer? ⇒ Boolean
- #close ⇒ Object
- #deadline ⇒ Object
- #deadline_exceeded? ⇒ Boolean
- #each_remote_read ⇒ Object (also: #each)
- #input_stream? ⇒ Boolean
- #input_type ⇒ Object
- #output_stream? ⇒ Boolean
- #output_type ⇒ Object
- #parse_from_json_stream(json_stream) ⇒ Object
- #reader ⇒ Object
- #remote_read ⇒ Object
- #remote_send(response) ⇒ Object
- #request_response? ⇒ Boolean
- #send_empty ⇒ Object
- #send_framed_message(message_data, compressed = nil) ⇒ Object
- #send_initial_metadata(kv_pairs) ⇒ Object
- #send_status(status_code, status_details, trailing_metadata = {}) ⇒ Object
- #send_trailers(trailers) ⇒ Object
- #server_streamer? ⇒ Boolean
Instance Attribute Details
#rpc_desc ⇒ Object
Returns the value of attribute rpc_desc.
7 8 9 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 7 def rpc_desc @rpc_desc end |
Instance Method Details
#bidi_streamer? ⇒ Boolean
195 196 197 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 195 def bidi_streamer? input_stream? && output_stream? end |
#client_streamer? ⇒ Boolean
199 200 201 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 199 def client_streamer? input_stream? && !output_stream? end |
#close ⇒ Object
29 30 31 32 33 34 35 36 37 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 29 def close if output_stream? && content_type == "application/json" stream.write("[") unless @opened stream.write("]") end @reader&.close stream.close end |
#deadline ⇒ Object
39 40 41 42 43 44 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 39 def deadline return @deadline if defined?(@deadline) return @deadline = nil unless timeout @deadline = Time.now + timeout end |
#deadline_exceeded? ⇒ Boolean
180 181 182 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 180 def deadline_exceeded? @deadline && @deadline <= Time.now end |
#each_remote_read ⇒ Object Also known as: each
184 185 186 187 188 189 190 191 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 184 def each_remote_read return enum_for(:each_remote_read) unless block_given? while (resp = remote_read) || !cancelled? yield resp end end |
#input_stream? ⇒ Boolean
9 10 11 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 9 def input_stream? @input_stream ||= @rpc_desc&.input.is_a?(GRPC::RpcDesc::Stream) || false end |
#input_type ⇒ Object
17 18 19 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 17 def input_type @input_type ||= input_stream? ? rpc_desc.input.type : rpc_desc.input end |
#output_stream? ⇒ Boolean
13 14 15 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 13 def output_stream? @output_stream ||= @rpc_desc&.output.is_a?(GRPC::RpcDesc::Stream) || false end |
#output_type ⇒ Object
21 22 23 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 21 def output_type @output_type ||= output_stream? ? rpc_desc.output.type : rpc_desc.output end |
#parse_from_json_stream(json_stream) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 46 def parse_from_json_stream(json_stream) first_char = nil loop do char = json_stream.read(1) break if char.nil? if char =~ /\s/ next elsif ["[", ","].include?(char) first_char = char break elsif char == "]" return nil else # If the first non-whitespace character is not '[' or comma, return nil. return nil end end return nil if first_char.nil? # Step 2: Process objects until we hit the end of the JSON stream or array. loop do # rubocop:disable Lint/UnreachableLoop,Metrics/BlockLength # Skip any whitespace or commas preceding an object. char = nil loop do char = json_stream.read(1) break if char.nil? next if char =~ /\s/ break end # The next non-whitespace, non-comma character should be the start of an object. return nil unless char == "{" # Step 3: Start buffering the JSON object. buffer = "{".dup stack = ["{"] in_string = false escape = false while stack.any? ch = json_stream.read(1) return nil if ch.nil? # premature end of stream buffer << ch if in_string if escape escape = false next end if ch == "\\" escape = true elsif ch == '"' in_string = false end elsif ch == '"' in_string = true elsif ["{", "["].include?(ch) stack.push(ch) elsif ["}", "]"].include?(ch) expected = (ch == "}" ? "{" : "[") # Check for matching bracket. return nil unless stack.last == expected stack.pop end end # Yield the complete JSON object (as a string). return buffer end end |
#reader ⇒ Object
25 26 27 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 25 def reader @reader ||= IO.open(stream.reader_fileno, "rb") end |
#remote_read ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 119 def remote_read if content_type == "application/json" if input_stream? if (next_item = parse_from_json_stream(reader)) input_type.decode_json(next_item) end else input_type.decode_json(reader.read) end else header = reader.read(5) return nil if header.nil? || header.bytesize < 5 compressed = header.bytes[0] == 1 length = header[1..4].unpack1("N") data = reader.read(length) return nil if data.nil? data = decompress_input(data) if compressed input_type.decode(data) end end |
#remote_send(response) ⇒ Object
176 177 178 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 176 def remote_send(response) (response) end |
#request_response? ⇒ Boolean
207 208 209 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 207 def request_response? !input_stream? && !output_stream? end |
#send_empty ⇒ Object
215 216 217 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 215 def send_empty remote_send(output_type.new) unless @body_written end |
#send_framed_message(message_data, compressed = nil) ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 144 def (, compressed = nil) if content_type == "application/json" if output_stream? if @opened stream.write(",\n") else stream.write("[") @opened = true end end = output_type.encode_json() stream.write() else = output_type.encode() should_compress = compressed.nil? ? should_compress_output?(.bytesize) : compressed if should_compress = compress_output() compressed_flag = 1 else compressed_flag = 0 end = [compressed_flag, .bytesize].pack("CN") << stream.write() @body_written = true end rescue IOError close end |
#send_initial_metadata(kv_pairs) ⇒ Object
211 212 213 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 211 def (kv_pairs) add_headers(kv_pairs.transform_values { |v| Array(v) }) end |
#send_status(status_code, status_details, trailing_metadata = {}) ⇒ Object
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 219 def send_status(status_code, status_details, = {}) trailers = { "grpc-status" => status_code.to_s } unless status_details.nil? || status_details.empty? = status_details.gsub(/[ #%&+,:;<=>?@\[\]^`{|}~\\"]/) do |c| "%" + c.ord.to_s(16).upcase end trailers["grpc-message"] = end .each do |key, value| trailers[key] = \ if key.to_s.end_with?("-bin") Base64.strict_encode64(value.to_s) else value.to_s end end send_trailers(trailers) end |
#send_trailers(trailers) ⇒ Object
243 244 245 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 243 def send_trailers(trailers) stream.send_trailers(trailers) end |
#server_streamer? ⇒ Boolean
203 204 205 |
# File 'lib/itsi/server/grpc/grpc_call.rb', line 203 def server_streamer? !input_stream? && output_stream? end |