Class: Itsi::GrpcCall

Inherits:
Object
  • Object
show all
Defined in:
lib/itsi/server/grpc/grpc_call.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#rpc_descObject

Returns the value of attribute rpc_desc.



7
8
9
# File 'lib/itsi/server/grpc/grpc_call.rb', line 7

def rpc_desc
  @rpc_desc
end

Instance Method Details

#bidi_streamer?Boolean

Returns:

  • (Boolean)


195
196
197
# File 'lib/itsi/server/grpc/grpc_call.rb', line 195

def bidi_streamer?
  input_stream? && output_stream?
end

#client_streamer?Boolean

Returns:

  • (Boolean)


199
200
201
# File 'lib/itsi/server/grpc/grpc_call.rb', line 199

def client_streamer?
  input_stream? && !output_stream?
end

#closeObject



29
30
31
32
33
34
35
36
37
# File 'lib/itsi/server/grpc/grpc_call.rb', line 29

def close
  if output_stream? && content_type == "application/json"
    stream.write("[") unless @opened
    stream.write("]")
  end

  @reader&.close
  stream.close
end

#deadlineObject



39
40
41
42
43
44
# File 'lib/itsi/server/grpc/grpc_call.rb', line 39

def deadline
  return @deadline if defined?(@deadline)
  return @deadline = nil unless timeout

  @deadline = Time.now + timeout
end

#deadline_exceeded?Boolean

Returns:

  • (Boolean)


180
181
182
# File 'lib/itsi/server/grpc/grpc_call.rb', line 180

def deadline_exceeded?
  @deadline && @deadline <= Time.now
end

#each_remote_readObject Also known as: each



184
185
186
187
188
189
190
191
# File 'lib/itsi/server/grpc/grpc_call.rb', line 184

def each_remote_read
  return enum_for(:each_remote_read) unless block_given?

  while (resp = remote_read) || !cancelled?

    yield resp
  end
end

#input_stream?Boolean

Returns:

  • (Boolean)


9
10
11
# File 'lib/itsi/server/grpc/grpc_call.rb', line 9

def input_stream?
  @input_stream ||= @rpc_desc&.input.is_a?(GRPC::RpcDesc::Stream) || false
end

#input_typeObject



17
18
19
# File 'lib/itsi/server/grpc/grpc_call.rb', line 17

def input_type
  @input_type ||= input_stream? ? rpc_desc.input.type : rpc_desc.input
end

#output_stream?Boolean

Returns:

  • (Boolean)


13
14
15
# File 'lib/itsi/server/grpc/grpc_call.rb', line 13

def output_stream?
  @output_stream ||= @rpc_desc&.output.is_a?(GRPC::RpcDesc::Stream) || false
end

#output_typeObject



21
22
23
# File 'lib/itsi/server/grpc/grpc_call.rb', line 21

def output_type
  @output_type ||= output_stream? ? rpc_desc.output.type : rpc_desc.output
end

#parse_from_json_stream(json_stream) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/itsi/server/grpc/grpc_call.rb', line 46

def parse_from_json_stream(json_stream)
  first_char = nil
  loop do
    char = json_stream.read(1)
    break if char.nil?
    if char =~ /\s/
      next
    elsif ["[", ","].include?(char)
      first_char = char
      break
    elsif char == "]"
      return nil
    else
      # If the first non-whitespace character is not '[' or comma, return nil.
      return nil
    end
  end

  return nil if first_char.nil?

  # Step 2: Process objects until we hit the end of the JSON stream or array.
  loop do # rubocop:disable Lint/UnreachableLoop,Metrics/BlockLength
    # Skip any whitespace or commas preceding an object.
    char = nil
    loop do
      char = json_stream.read(1)
      break if char.nil?
      next if char =~ /\s/

      break
    end
    # The next non-whitespace, non-comma character should be the start of an object.
    return nil unless char == "{"

    # Step 3: Start buffering the JSON object.
    buffer = "{".dup
    stack = ["{"]
    in_string = false
    escape = false

    while stack.any?
      ch = json_stream.read(1)
      return nil if ch.nil? # premature end of stream

      buffer << ch

      if in_string
        if escape
          escape = false
          next
        end
        if ch == "\\"
          escape = true
        elsif ch == '"'
          in_string = false
        end
      elsif ch == '"'
        in_string = true
      elsif ["{", "["].include?(ch)
        stack.push(ch)
      elsif ["}", "]"].include?(ch)
        expected = (ch == "}" ? "{" : "[")
        # Check for matching bracket.
        return nil unless stack.last == expected

        stack.pop
      end
    end
    # Yield the complete JSON object (as a string).
    return buffer
  end
end

#readerObject



25
26
27
# File 'lib/itsi/server/grpc/grpc_call.rb', line 25

def reader
  @reader ||= IO.open(stream.reader_fileno, "rb")
end

#remote_readObject



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/itsi/server/grpc/grpc_call.rb', line 119

def remote_read
  if content_type == "application/json"
    if input_stream?
      if (next_item = parse_from_json_stream(reader))
        input_type.decode_json(next_item)
      end
    else
      input_type.decode_json(reader.read)
    end
  else
    header = reader.read(5)
    return nil if header.nil? || header.bytesize < 5

    compressed = header.bytes[0] == 1
    length = header[1..4].unpack1("N")

    data = reader.read(length)
    return nil if data.nil?

    data = decompress_input(data) if compressed

    input_type.decode(data)
  end
end

#remote_send(response) ⇒ Object



176
177
178
# File 'lib/itsi/server/grpc/grpc_call.rb', line 176

def remote_send(response)
  send_framed_message(response)
end

#request_response?Boolean

Returns:

  • (Boolean)


207
208
209
# File 'lib/itsi/server/grpc/grpc_call.rb', line 207

def request_response?
  !input_stream? && !output_stream?
end

#send_emptyObject



215
216
217
# File 'lib/itsi/server/grpc/grpc_call.rb', line 215

def send_empty
  remote_send(output_type.new) unless @body_written
end

#send_framed_message(message_data, compressed = nil) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/itsi/server/grpc/grpc_call.rb', line 144

def send_framed_message(message_data, compressed = nil)
  if content_type == "application/json"
    if output_stream?
      if @opened
        stream.write(",\n")
      else
        stream.write("[")
        @opened = true
      end
    end
    message_data = output_type.encode_json(message_data)
    stream.write(message_data)
  else
    message_data = output_type.encode(message_data)
    should_compress = compressed.nil? ? should_compress_output?(message_data.bytesize) : compressed

    if should_compress
      message_data = compress_output(message_data)
      compressed_flag = 1
    else
      compressed_flag = 0
    end

    message = [compressed_flag, message_data.bytesize].pack("CN") << message_data
    stream.write(message)

    @body_written = true
  end
rescue IOError
  close
end

#send_initial_metadata(kv_pairs) ⇒ Object



211
212
213
# File 'lib/itsi/server/grpc/grpc_call.rb', line 211

def (kv_pairs)
  add_headers(kv_pairs.transform_values { |v| Array(v) })
end

#send_status(status_code, status_details, trailing_metadata = {}) ⇒ Object



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/itsi/server/grpc/grpc_call.rb', line 219

def send_status(status_code, status_details,  = {})
  trailers = {
    "grpc-status" => status_code.to_s
  }

  unless status_details.nil? || status_details.empty?
    encoded_message = status_details.gsub(/[ #%&+,:;<=>?@\[\]^`{|}~\\"]/) do |c|
      "%" + c.ord.to_s(16).upcase
    end
    trailers["grpc-message"] = encoded_message
  end

  .each do |key, value|
    trailers[key] = \
      if key.to_s.end_with?("-bin")
        Base64.strict_encode64(value.to_s)
      else
        value.to_s
      end
  end

  send_trailers(trailers)
end

#send_trailers(trailers) ⇒ Object



243
244
245
# File 'lib/itsi/server/grpc/grpc_call.rb', line 243

def send_trailers(trailers)
  stream.send_trailers(trailers)
end

#server_streamer?Boolean

Returns:

  • (Boolean)


203
204
205
# File 'lib/itsi/server/grpc/grpc_call.rb', line 203

def server_streamer?
  !input_stream? && output_stream?
end