class TestTarget

A runnable implementation of the schema-specified testing service, with each service method implemented as required by the interop testing spec.

This implements LoadBalancerStatsService required by the test runner

Public Instance Methods

empty_call(_empty, _call) click to toggle source
# File src/ruby/pb/test/server.rb, line 175
def empty_call(_empty, _call)
  Empty.new
end
full_duplex_call(reqs, _call) click to toggle source
# File src/ruby/pb/test/server.rb, line 201
def full_duplex_call(reqs, _call)
  maybe_echo_metadata(_call)
  # reqs is a lazy Enumerator of the requests sent by the client.
  FullDuplexEnumerator.new(reqs).each_item
end
get_client_accumulated_stats(req, _call) click to toggle source
# File src/ruby/pb/test/xds_client.rb, line 174
def get_client_accumulated_stats(req, _call)
  $accumulated_stats_mu.synchronize do
    all_stats_per_method = $accumulated_method_stats.map { |rpc, stats_per_method|
      [rpc,
       LoadBalancerAccumulatedStatsResponse::MethodStats.new(
        rpcs_started: stats_per_method.rpcs_started,
        result: stats_per_method.result
       )]
    }.to_h
    LoadBalancerAccumulatedStatsResponse.new(
      num_rpcs_started_by_method: $num_rpcs_started_by_method,
      num_rpcs_succeeded_by_method: $num_rpcs_succeeded_by_method,
      num_rpcs_failed_by_method: $num_rpcs_failed_by_method,
      stats_per_method: all_stats_per_method,
    )
  end
end
get_client_stats(req, _call) click to toggle source
# File src/ruby/pb/test/xds_client.rb, line 139
def get_client_stats(req, _call)
  finish_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) +
                req['timeout_sec']
  watcher = {}
  $watchers_mutex.synchronize do
    watcher = {
      "rpcs_by_method" => Hash.new(),
      "rpcs_by_peer" => Hash.new(0),
      "rpcs_needed" => req['num_rpcs'],
      "no_remote_peer" => 0
    }
    $watchers << watcher
    seconds_remaining = finish_time -
                        Process.clock_gettime(Process::CLOCK_MONOTONIC)
    while watcher['rpcs_needed'] > 0 && seconds_remaining > 0
      $watchers_cv.wait($watchers_mutex, seconds_remaining)
      seconds_remaining = finish_time -
                          Process.clock_gettime(Process::CLOCK_MONOTONIC)
    end
    $watchers.delete_at($watchers.index(watcher))
  end
  # convert results into proper proto object
  rpcs_by_method = {}
  watcher['rpcs_by_method'].each do |rpc_name, rpcs_by_peer|
    rpcs_by_method[rpc_name] = LoadBalancerStatsResponse::RpcsByPeer.new(
      rpcs_by_peer: rpcs_by_peer
    )
  end
  LoadBalancerStatsResponse.new(
    rpcs_by_method: rpcs_by_method,
    rpcs_by_peer: watcher['rpcs_by_peer'],
    num_failures: watcher['no_remote_peer'] + watcher['rpcs_needed']
  )
end
half_duplex_call(reqs) click to toggle source
# File src/ruby/pb/test/server.rb, line 207
def half_duplex_call(reqs)
  # TODO: update with unique behaviour of the half_duplex_call if that's
  # ever required by any of the tests.
  full_duplex_call(reqs)
end
streaming_input_call(call) click to toggle source
# File src/ruby/pb/test/server.rb, line 187
def streaming_input_call(call)
  sizes = call.each_remote_read.map { |x| x.payload.body.length }
  sum = sizes.inject(0) { |s, x| s + x }
  StreamingInputCallResponse.new(aggregated_payload_size: sum)
end
streaming_output_call(req, _call) click to toggle source
# File src/ruby/pb/test/server.rb, line 193
def streaming_output_call(req, _call)
  cls = StreamingOutputCallResponse
  req.response_parameters.map do |p|
    cls.new(payload: Payload.new(type: req.response_type,
                                 body: nulls(p.size)))
  end
end
unary_call(simple_req, _call) click to toggle source
# File src/ruby/pb/test/server.rb, line 179
def unary_call(simple_req, _call)
  maybe_echo_metadata(_call)
  maybe_echo_status_and_message(simple_req)
  req_size = simple_req.response_size
  SimpleResponse.new(payload: Payload.new(type: :COMPRESSABLE,
                                          body: nulls(req_size)))
end