Class: TestTarget

Inherits:
Grpc::Testing::LoadBalancerStatsService::Service show all
Includes:
Grpc::Testing, Grpc::Testing::PayloadType
Defined in:
src/ruby/pb/test/server.rb,
src/ruby/pb/test/xds_client.rb

Overview

This implements LoadBalancerStatsService required by the test runner

Constant Summary

Constants included from Grpc::Testing

Grpc::Testing::BoolValue, Grpc::Testing::ClientConfigureRequest, Grpc::Testing::ClientConfigureResponse, Grpc::Testing::EchoStatus, Grpc::Testing::Empty, Grpc::Testing::EmptyMessage, Grpc::Testing::GaugeRequest, Grpc::Testing::GaugeResponse, Grpc::Testing::GrpclbRouteType, Grpc::Testing::HookRequest, Grpc::Testing::HookResponse, Grpc::Testing::LoadBalancerAccumulatedStatsRequest, Grpc::Testing::LoadBalancerAccumulatedStatsResponse, Grpc::Testing::LoadBalancerStatsRequest, Grpc::Testing::LoadBalancerStatsResponse, Grpc::Testing::MemorySize, Grpc::Testing::Payload, Grpc::Testing::PayloadType, Grpc::Testing::ReconnectInfo, Grpc::Testing::ReconnectParams, Grpc::Testing::ResponseParameters, Grpc::Testing::SetReturnStatusRequest, Grpc::Testing::SimpleRequest, Grpc::Testing::SimpleResponse, Grpc::Testing::StreamingInputCallRequest, Grpc::Testing::StreamingInputCallResponse, Grpc::Testing::StreamingOutputCallRequest, Grpc::Testing::StreamingOutputCallResponse, Grpc::Testing::TestOrcaReport

Instance Method Summary collapse

Methods included from GRPC::GenericService

included, underscore

Instance Method Details

#empty_call(_empty, _call) ⇒ Object



175
176
177
# File 'src/ruby/pb/test/server.rb', line 175

def empty_call(_empty, _call)
  Empty.new
end

#full_duplex_call(reqs, _call) ⇒ Object



201
202
203
204
205
# File 'src/ruby/pb/test/server.rb', line 201

def full_duplex_call(reqs, _call)
  (_call)
  # reqs is a lazy Enumerator of the requests sent by the client.
  FullDuplexEnumerator.new(reqs).each_item
end

#get_client_accumulated_stats(req, _call) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'src/ruby/pb/test/xds_client.rb', line 174

def get_client_accumulated_stats(req, _call)
  $accumulated_stats_mu.synchronize do
    all_stats_per_method = $accumulated_method_stats.map { |rpc, stats_per_method|
      [rpc,
       LoadBalancerAccumulatedStatsResponse::MethodStats.new(
        rpcs_started: stats_per_method.rpcs_started,
        result: stats_per_method.result
       )]
    }.to_h
    LoadBalancerAccumulatedStatsResponse.new(
      num_rpcs_started_by_method: $num_rpcs_started_by_method,
      num_rpcs_succeeded_by_method: $num_rpcs_succeeded_by_method,
      num_rpcs_failed_by_method: $num_rpcs_failed_by_method,
      stats_per_method: all_stats_per_method,
    )
  end
end

#get_client_stats(req, _call) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'src/ruby/pb/test/xds_client.rb', line 139

def get_client_stats(req, _call)
  finish_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) +
                req['timeout_sec']
  watcher = {}
  $watchers_mutex.synchronize do
    watcher = {
      "rpcs_by_method" => Hash.new(),
      "rpcs_by_peer" => Hash.new(0),
      "rpcs_needed" => req['num_rpcs'],
      "no_remote_peer" => 0
    }
    $watchers << watcher
    seconds_remaining = finish_time -
                        Process.clock_gettime(Process::CLOCK_MONOTONIC)
    while watcher['rpcs_needed'] > 0 && seconds_remaining > 0
      $watchers_cv.wait($watchers_mutex, seconds_remaining)
      seconds_remaining = finish_time -
                          Process.clock_gettime(Process::CLOCK_MONOTONIC)
    end
    $watchers.delete_at($watchers.index(watcher))
  end
  # convert results into proper proto object
  rpcs_by_method = {}
  watcher['rpcs_by_method'].each do |rpc_name, rpcs_by_peer|
    rpcs_by_method[rpc_name] = LoadBalancerStatsResponse::RpcsByPeer.new(
      rpcs_by_peer: rpcs_by_peer
    )
  end
  LoadBalancerStatsResponse.new(
    rpcs_by_method: rpcs_by_method,
    rpcs_by_peer: watcher['rpcs_by_peer'],
    num_failures: watcher['no_remote_peer'] + watcher['rpcs_needed']
  )
end

#half_duplex_call(reqs) ⇒ Object



207
208
209
210
211
# File 'src/ruby/pb/test/server.rb', line 207

def half_duplex_call(reqs)
  # TODO: update with unique behaviour of the half_duplex_call if that's
  # ever required by any of the tests.
  full_duplex_call(reqs)
end

#streaming_input_call(call) ⇒ Object



187
188
189
190
191
# File 'src/ruby/pb/test/server.rb', line 187

def streaming_input_call(call)
  sizes = call.each_remote_read.map { |x| x.payload.body.length }
  sum = sizes.inject(0) { |s, x| s + x }
  StreamingInputCallResponse.new(aggregated_payload_size: sum)
end

#streaming_output_call(req, _call) ⇒ Object



193
194
195
196
197
198
199
# File 'src/ruby/pb/test/server.rb', line 193

def streaming_output_call(req, _call)
  cls = StreamingOutputCallResponse
  req.response_parameters.map do |p|
    cls.new(payload: Payload.new(type: req.response_type,
                                 body: nulls(p.size)))
  end
end

#unary_call(simple_req, _call) ⇒ Object



179
180
181
182
183
184
185
# File 'src/ruby/pb/test/server.rb', line 179

def unary_call(simple_req, _call)
  (_call)
  maybe_echo_status_and_message(simple_req)
  req_size = simple_req.response_size
  SimpleResponse.new(payload: Payload.new(type: :COMPRESSABLE,
                                          body: nulls(req_size)))
end