class Object
Constants
- ALGORITHMS
Names of supported compression algorithms
- AUTH_ENV
- ActiveCall
The
ActiveCall
class provides simple methods for sending marshallable data to a call- Args
Args
is used to hold the command line info.- BidiStub
- COMPRESS_LEVELS
Names of valid supported compression levels
- Call
- CallCredentials
- CallError
- CallOps
- ChannelCredentials
- CheckCallAfterFinishedServiceStub
- CheckerStub
- Creds
- DebugMessageTestServiceStub
- Dsl
- EchoStub
- FailingStub
- GenericService
Provides behaviour used to implement schema-derived service classes.
Is intended to be used to support both client and server IDL-schema-derived servers.
- GoogleRpcStatusTestStub
- HCReq
- HCResp
- INTERNAL
- NoProtoStub
- NoStatusDetailsBinTestServiceStub
- OK
- Pool
Pool
is a simple thread pool.- RpcDesc
RpcDesc
is a Descriptor of an RPC method.- RpcServer
RpcServer
hosts a number of services and makes them available on the network.- Server
- ServingStatus
- SlowStub
- SslTestServiceStub
- StatusCodes
StatusCodes
defines the canonical error codes used by gRPC for the RPC API.- Stream
- SynchronizedCancellationStub
- TEST_DEBUG_MESSAGE
- TEST_WRITE_FLAGS
- TimeConsts
TimeConsts
is a module from the C extension.Here it's re-opened to add a utility func.
- UNKNOWN
- UserAgentEchoServiceStub
- WriteFlags
- XdsCreds
Public Instance Methods
# File src/ruby/pb/test/client.rb, line 751 def _check_args(args) %w(server_host server_port test_case).each do |a| if args[a].nil? fail(OptionParser::MissingArgument, "please specify --#{a}") end end args end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 361 def arg_error_msg(error = nil) error ||= ArgumentError.new('other error') "#{error.class}: #{error.message}" end
Fails with AssertionError
if the block does evaluate to true
# File src/ruby/pb/test/client.rb, line 66 def assert(msg = 'unknown cause') fail 'No assertion block provided' unless block_given? fail AssertionError, msg unless yield end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 337 def bad_status(_req, _call) fail GRPC::BadStatus.new(@bs_code, 'NOK') end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 345 def bad_status_alt(_call) fail GRPC::BadStatus.new(@bs_code, 'NOK') end
# File src/ruby/spec/pb/duplicate/codegen_spec.rb, line 19 def can_run_codegen_check system('which grpc_ruby_plugin') && system('which protoc') end
# File src/ruby/spec/generic/rpc_server_spec.rb, line 22 def check_md(wanted_md, received_md) wanted_md.zip(received_md).each do |w, r| w.each do |key, value| expect(r[key]).to eq(value) end end end
# File src/ruby/spec/generic/rpc_server_spec.rb, line 668 def check_multi_req_view_of_finished_call(call) common_check_of_finished_server_call(call) expect do call.each_remote_read.each { |r| p r } end.to raise_error(GRPC::Core::CallError) end
check that methods on a finished/closed call t crash
# File src/ruby/spec/generic/client_stub_spec.rb, line 40 def check_op_view_of_finished_client_call(op_view, expected_metadata, expected_trailing_metadata) # use read_response_stream to try to iterate through # possible response stream fail('need something to attempt reads') unless block_given? expect do resp = op_view.execute yield resp end.to raise_error(GRPC::Core::CallError) expect { op_view.start_call }.to raise_error(RuntimeError) sanity_check_values_of_accessors(op_view, expected_metadata, expected_trailing_metadata) expect do op_view.wait op_view.cancel op_view.write_flag = 1 end.to_not raise_error end
check that the server-side call is still in a usable state even after it has finished
# File src/ruby/spec/generic/rpc_server_spec.rb, line 661 def check_single_req_view_of_finished_call(call) common_check_of_finished_server_call(call) expect(call.peer).to be_a(String) expect(call.peer_cert).to be(nil) end
# File src/ruby/spec/client_server_spec.rb, line 277 def client_cancel_test(cancel_proc, expected_code, expected_details) call = new_client_call server_call = nil server_thread = Thread.new do server_call = server_allows_client_to_proceed end client_ops = { CallOps::SEND_INITIAL_METADATA => {}, CallOps::RECV_INITIAL_METADATA => nil } client_batch = call.run_batch(client_ops) expect(client_batch.send_metadata).to be true expect(client_batch.metadata).to eq({}) cancel_proc.call(call) server_thread.join server_ops = { CallOps::RECV_CLOSE_ON_SERVER => nil } server_batch = server_call.run_batch(server_ops) expect(server_batch.send_close).to be true client_ops = { CallOps::RECV_STATUS_ON_CLIENT => {} } client_batch = call.run_batch(client_ops) expect(client_batch.status.code).to be expected_code expect(client_batch.status.details).to eq expected_details end
# File src/ruby/spec/client_auth_spec.rb, line 24 def client_cert test_root = File.join(File.dirname(__FILE__), 'testdata') cert = File.open(File.join(test_root, 'client.pem')).read fail unless cert.is_a?(String) cert end
# File src/ruby/spec/generic/client_stub_spec.rb, line 86 def close_active_server_call(active_server_call) active_server_call.send(:set_input_stream_done) active_server_call.send(:set_output_stream_done) end
# File src/ruby/spec/generic/rpc_server_spec.rb, line 676 def common_check_of_finished_server_call(call) expect do call.merge_metadata_to_send({}) end.to raise_error(RuntimeError) expect do call.send_initial_metadata end.to_not raise_error expect(call.cancelled?).to be(false) expect(call.metadata).to be_a(Hash) expect(call.metadata['user-agent']).to be_a(String) expect(call.metadata_sent).to be(true) expect(call.output_metadata).to eq({}) expect(call.metadata_to_send).to eq({}) expect(call.deadline.is_a?(Time)).to be(true) end
# File src/ruby/spec/channel_spec.rb, line 116 def construct_with_args(a) proc { GRPC::Core::Channel.new('phony_host', a, create_test_cert) } end
# File src/ruby/spec/client_auth_spec.rb, line 17 def create_channel_creds test_root = File.join(File.dirname(__FILE__), 'testdata') files = ['ca.pem', 'client.key', 'client.pem'] creds = files.map { |f| File.open(File.join(test_root, f)).read } GRPC::Core::ChannelCredentials.new(creds[0], creds[1], creds[2]) end
# File src/ruby/spec/generic/client_stub_spec.rb, line 1059 def create_secure_test_server certs = load_test_certs secure_credentials = GRPC::Core::ServerCredentials.new( nil, [{ private_key: certs[1], cert_chain: certs[2] }], false) @server = new_core_server_for_testing(nil) @server.add_http2_port('0.0.0.0:0', secure_credentials) end
# File src/ruby/spec/client_auth_spec.rb, line 31 def create_server_creds test_root = File.join(File.dirname(__FILE__), 'testdata') GRPC.logger.info("test root: #{test_root}") files = ['ca.pem', 'server1.key', 'server1.pem'] creds = files.map { |f| File.open(File.join(test_root, f)).read } GRPC::Core::ServerCredentials.new( creds[0], [{ private_key: creds[1], cert_chain: creds[2] }], true) # force client auth end
creates a test stub that accesses host:port securely.
# File src/ruby/pb/test/client.rb, line 97 def create_stub(opts) address = "#{opts.server_host}:#{opts.server_port}" # Provide channel args that request compression by default # for compression interop tests if ['client_compressed_unary', 'client_compressed_streaming'].include?(opts.test_case) compression_options = GRPC::Core::CompressionOptions.new(default_algorithm: :gzip) compression_channel_args = compression_options.to_channel_arg_hash else compression_channel_args = {} end if opts.secure creds = ssl_creds(opts.use_test_ca) stub_opts = { channel_args: {} } unless opts.server_host_override.empty? stub_opts[:channel_args].merge!({ GRPC::Core::Channel::SSL_TARGET => opts.server_host_override }) end # Add service account creds if specified wants_creds = %w(all compute_engine_creds service_account_creds) if wants_creds.include?(opts.test_case) unless opts.oauth_scope.nil? auth_creds = Google::Auth.get_application_default(opts.oauth_scope) call_creds = GRPC::Core::CallCredentials.new(auth_creds.updater_proc) creds = creds.compose call_creds end end if opts.test_case == 'oauth2_auth_token' auth_creds = Google::Auth.get_application_default(opts.oauth_scope) kw = auth_creds.updater_proc.call({}) # gives as an auth token # use a metadata update proc that just adds the auth token. call_creds = GRPC::Core::CallCredentials.new(proc { |md| md.merge(kw) }) creds = creds.compose call_creds end if opts.test_case == 'jwt_token_creds' # don't use a scope auth_creds = Google::Auth.get_application_default call_creds = GRPC::Core::CallCredentials.new(auth_creds.updater_proc) creds = creds.compose call_creds end GRPC.logger.info("... connecting securely to #{address}") stub_opts[:channel_args].merge!(compression_channel_args) if opts.test_case == "unimplemented_service" Grpc::Testing::UnimplementedService::Stub.new(address, creds, **stub_opts) else Grpc::Testing::TestService::Stub.new(address, creds, **stub_opts) end else GRPC.logger.info("... connecting insecurely to #{address}") if opts.test_case == "unimplemented_service" Grpc::Testing::UnimplementedService::Stub.new( address, :this_channel_is_insecure, channel_args: compression_channel_args ) else Grpc::Testing::TestService::Stub.new( address, :this_channel_is_insecure, channel_args: compression_channel_args ) end end end
# File src/ruby/spec/channel_spec.rb, line 27 def create_test_cert GRPC::Core::ChannelCredentials.new(load_test_certs[0]) end
# File src/ruby/spec/generic/client_stub_spec.rb, line 1068 def create_test_server @server = new_core_server_for_testing(nil) @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) end
# File src/ruby/spec/client_server_spec.rb, line 597 def credentials_update_test(creds_update_md) auth_proc = proc { creds_update_md } call_creds = GRPC::Core::CallCredentials.new(auth_proc) initial_md_key = 'k2' initial_md_val = 'v2' initial_md = { initial_md_key => initial_md_val } expected_md = creds_update_md.clone fail 'bad test param' unless expected_md[initial_md_key].nil? expected_md[initial_md_key] = initial_md_val recvd_rpc = nil rcv_thread = Thread.new do recvd_rpc = @server.request_call end call = new_client_call call.set_credentials! call_creds client_batch = call.run_batch( CallOps::SEND_INITIAL_METADATA => initial_md, CallOps::SEND_CLOSE_FROM_CLIENT => nil) expect(client_batch.send_metadata).to be true expect(client_batch.send_close).to be true # confirm the server can receive the client metadata rcv_thread.join expect(recvd_rpc).to_not eq nil recvd_md = recvd_rpc.metadata replace_symbols = Hash[expected_md.each_pair.collect { |x, y| [x.to_s, y] }] expect(recvd_md).to eq(recvd_md.merge(replace_symbols)) credentials_update_test_finish_call(call, recvd_rpc.call) end
# File src/ruby/spec/client_server_spec.rb, line 632 def credentials_update_test_finish_call(client_call, server_call) final_server_batch = server_call.run_batch( CallOps::RECV_CLOSE_ON_SERVER => nil, CallOps::SEND_INITIAL_METADATA => nil, CallOps::SEND_STATUS_FROM_SERVER => ok_status) expect(final_server_batch.send_close).to be(true) expect(final_server_batch.send_metadata).to be(true) expect(final_server_batch.send_status).to be(true) final_client_batch = client_call.run_batch( CallOps::RECV_INITIAL_METADATA => nil, CallOps::RECV_STATUS_ON_CLIENT => nil) expect(final_client_batch.metadata).to eq({}) expect(final_client_batch.status.code).to eq(0) end
# File src/ruby/spec/call_spec.rb, line 177 def deadline Time.now + 2 # in 2 seconds; arbitrary end
# File src/ruby/bin/math_client.rb, line 44 def do_div(stub) GRPC.logger.info('request_response') GRPC.logger.info('----------------') req = Math::DivArgs.new(dividend: 7, divisor: 3) GRPC.logger.info("div(7/3): req=#{req.inspect}") resp = stub.div(req) GRPC.logger.info("Answer: #{resp.inspect}") GRPC.logger.info('----------------') end
# File src/ruby/bin/math_client.rb, line 77 def do_div_many(stub) GRPC.logger.info('bidi_streamer') GRPC.logger.info('-------------') reqs = [] reqs << Math::DivArgs.new(dividend: 7, divisor: 3) reqs << Math::DivArgs.new(dividend: 5, divisor: 2) reqs << Math::DivArgs.new(dividend: 7, divisor: 2) GRPC.logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}") resp = stub.div_many(reqs) resp.each do |r| GRPC.logger.info("Answer: #{r.inspect}") end GRPC.logger.info('----------------') end
# File src/ruby/bin/math_client.rb, line 65 def do_fib(stub) GRPC.logger.info('server_streamer') GRPC.logger.info('----------------') req = Math::FibArgs.new(limit: 11) GRPC.logger.info("fib(11): req=#{req.inspect}") resp = stub.fib(req) resp.each do |r| GRPC.logger.info("Answer: #{r.inspect}") end GRPC.logger.info('----------------') end
# File src/ruby/bin/math_client.rb, line 54 def do_sum(stub) # to make client streaming requests, pass an enumerable of the inputs GRPC.logger.info('client_streamer') GRPC.logger.info('---------------') reqs = [1, 2, 3, 4, 5].map { |x| Math::Num.new(num: x) } GRPC.logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}") resp = stub.sum(reqs) # reqs.is_a?(Enumerable) GRPC.logger.info("Answer: #{resp.inspect}") GRPC.logger.info('---------------') end
# File src/ruby/ext/grpc/extconf.rb, line 45 def env_append(name, string) ENV[name] ||= '' ENV[name] += ' ' + string end
# File src/ruby/ext/grpc/extconf.rb, line 33 def env_unset?(name) ENV[name].nil? || ENV[name].size == 0 end
# File src/ruby/pb/test/xds_client.rb, line 193 def execute_rpc_in_thread(op, rpc) Thread.new { rpc_stats_key = rpc.to_s remote_peer = "" begin op.execute if op.metadata.key?('hostname') remote_peer = op.metadata['hostname'] end $accumulated_stats_mu.synchronize do $num_rpcs_succeeded_by_method[rpc_stats_key] += 1 $accumulated_method_stats[rpc_stats_key].add_result(0) end rescue GRPC::BadStatus => e $accumulated_stats_mu.synchronize do $num_rpcs_failed_by_method[rpc_stats_key] += 1 $accumulated_method_stats[rpc_stats_key].add_result(e.code) end end $thread_results_mu.synchronize do $thread_results << [rpc, remote_peer] end } end
# File src/ruby/spec/generic/active_call_spec.rb, line 667 def expect_server_to_be_invoked(**kw) recvd_rpc = @received_rpcs_queue.pop expect(recvd_rpc).to_not eq nil recvd_call = recvd_rpc.call recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw) ActiveCall.new(recvd_call, @pass_through, @pass_through, deadline, metadata_received: true, started: true) end
# File src/ruby/spec/generic/active_call_spec.rb, line 661 def expect_server_to_receive(sent_text, **kw) c = expect_server_to_be_invoked(**kw) expect(c.remote_read).to eq(sent_text) c end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 333 def fake_bidistream(an_array) an_array end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 204 def fake_clstream(_arg) end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 321 def fake_reqresp(_req, _call) @ok_response end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 207 def fake_svstream(_arg1, _arg2) end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 210 def fake_three_args(_arg1, _arg2, _arg3) end
# File src/ruby/spec/channel_spec.rb, line 31 def fork_with_propagated_error_message pipe_read, pipe_write = IO.pipe pid = fork do pipe_read.close begin yield rescue => exc pipe_write.syswrite(exc.message) end pipe_write.close end pipe_write.close exc_message = pipe_read.read Process.wait(pid) unless $CHILD_STATUS.success? raise "forked process failed with #{$CHILD_STATUS}" end raise exc_message unless exc_message.empty? end
# File src/ruby/spec/generic/client_stub_spec.rb, line 295 def get_response(stub, credentials: nil) GRPC.logger.info(credentials.inspect) stub.request_response(@method, @sent_msg, noop, noop, metadata: @metadata, credentials: credentials) end
# File src/ruby/spec/generic/client_stub_spec.rb, line 517 def get_responses(stub, unmarshal: noop) e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, metadata: @metadata) expect(e).to be_a(Enumerator) e end
# File src/ruby/ext/grpc/extconf.rb, line 41 def inherit_rbconfig(name) ENV[name] = RbConfig::CONFIG[name] if env_unset?(name) && rbconfig_set?(name) end
# File src/ruby/spec/generic/active_call_spec.rb, line 36 def inner_call_of_active_call(active_call) active_call.instance_variable_get(:@call) end
# File src/ruby/bin/math_client.rb, line 92 def load_test_certs this_dir = File.expand_path(File.dirname(__FILE__)) data_dir = File.join(File.dirname(this_dir), 'spec/testdata') files = ['ca.pem', 'server1.key', 'server1.pem'] files.map { |f| File.open(File.join(data_dir, f)).read } end
# File src/ruby/bin/math_client.rb, line 104 def main options = { 'host' => 'localhost:7071', 'secure' => false } OptionParser.new do |opts| opts.banner = 'Usage: [--host <hostname>:<port>] [--secure|-s]' opts.on('--host HOST', '<hostname>:<port>') do |v| options['host'] = v end opts.on('-s', '--secure', 'access using test creds') do |v| options['secure'] = v end end.parse! # The Math::Math:: module occurs because the service has the same name as its # package. That practice should be avoided by defining real services. if options['secure'] stub_opts = { :creds => test_creds, GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr', timeout: INFINITE_FUTURE, } stub = Math::Math::Stub.new(options['host'], **stub_opts) GRPC.logger.info("... connecting securely on #{options['host']}") else stub = Math::Math::Stub.new(options['host'], :this_channel_is_insecure, timeout: INFINITE_FUTURE) GRPC.logger.info("... connecting insecurely on #{options['host']}") end do_div(stub) do_sum(stub) do_fib(stub) do_div_many(stub) end
# File src/ruby/spec/call_spec.rb, line 173 def make_test_call @ch.create_call(nil, nil, 'phony_method', nil, deadline) end
# File src/ruby/pb/test/server.rb, line 117 def maybe_echo_metadata(_call) # these are consistent for all interop tests initial_metadata_key = "x-grpc-test-echo-initial" trailing_metadata_key = "x-grpc-test-echo-trailing-bin" if _call.metadata.has_key?(initial_metadata_key) _call.metadata_to_send[initial_metadata_key] = _call.metadata[initial_metadata_key] end if _call.metadata.has_key?(trailing_metadata_key) _call.output_metadata[trailing_metadata_key] = _call.metadata[trailing_metadata_key] end end
# File src/ruby/pb/test/server.rb, line 131 def maybe_echo_status_and_message(req) unless req.response_status.nil? fail GRPC::BadStatus.new_status_exception( req.response_status.code, req.response_status.message) end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 181 def metadata_test(md) server_port = create_test_server host = "localhost:#{server_port}" th = run_request_response(@sent_msg, @resp, @pass, expected_metadata: md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) @metadata = md expect(get_response(stub)).to eq(@resp) th.join end
# File src/ruby/spec/client_server_spec.rb, line 37 def new_client_call @ch.create_call(nil, nil, '/method', nil, deadline) end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 201 def no_arg end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 353 def not_implemented(_req, _call) fail not_implemented_error end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 357 def not_implemented_alt(_call) fail not_implemented_error end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 366 def not_implemented_error NotImplementedError.new('some OS feature not implemented') end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 370 def not_implemented_error_msg(error = nil) error ||= not_implemented_error "#{error.class}: #{error.message}" end
produces a string of null chars (0) of length l.
# File src/ruby/pb/test/client.rb, line 173 def nulls(l) fail 'requires #{l} to be +ve' if l < 0 [].pack('x' * l).force_encoding('ascii-8bit') end
# File src/ruby/spec/client_server_spec.rb, line 41 def ok_status Struct::Status.new(StatusCodes::OK, 'OK') end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 341 def other_error(_req, _call) fail(ArgumentError, 'other error') end
# File src/ruby/spec/generic/rpc_desc_spec.rb, line 349 def other_error_alt(_call) fail(ArgumentError, 'other error') end
validates the command line options, returning them as a Hash.
# File src/ruby/pb/test/client.rb, line 714 def parse_args args = Args.new args.server_host_override = '' OptionParser.new do |opts| opts.on('--oauth_scope scope', 'Scope for OAuth tokens') { |v| args['oauth_scope'] = v } opts.on('--server_host SERVER_HOST', 'server hostname') do |v| args['server_host'] = v end opts.on('--default_service_account email_address', 'email address of the default service account') do |v| args['default_service_account'] = v end opts.on('--server_host_override HOST_OVERRIDE', 'override host via a HTTP header') do |v| args['server_host_override'] = v end opts.on('--server_port SERVER_PORT', 'server port') do |v| args['server_port'] = v end # instance_methods(false) gives only the methods defined in that class test_cases = NamedTests.instance_methods(false).map(&:to_s) test_case_list = test_cases.join(',') opts.on('--test_case CODE', test_cases, {}, 'select a test_case', " (#{test_case_list})") { |v| args['test_case'] = v } opts.on('--use_tls USE_TLS', ['false', 'true'], 'require a secure connection?') do |v| args['secure'] = v == 'true' end opts.on('--use_test_ca USE_TEST_CA', ['false', 'true'], 'if secure, use the test certificate?') do |v| args['use_test_ca'] = v == 'true' end end.parse! _check_args(args) end
validates the command line options, returning them as a Hash.
# File src/ruby/pb/test/server.rb, line 215 def parse_options options = { 'port' => nil, 'secure' => false } OptionParser.new do |opts| opts.banner = 'Usage: --port port' opts.on('--port PORT', 'server port') do |v| options['port'] = v end opts.on('--use_tls USE_TLS', ['false', 'true'], 'require a secure connection?') do |v| options['secure'] = v == 'true' end end.parse! if options['port'].nil? fail(OptionParser::MissingArgument, 'please specify --port') end options end
creates SSL Credentials from the production certificates.
# File src/ruby/pb/test/client.rb, line 86 def prod_creds GRPC::Core::ChannelCredentials.new() end
# File src/ruby/ext/grpc/extconf.rb, line 37 def rbconfig_set?(name) RbConfig::CONFIG[name] && RbConfig::CONFIG[name].size > 0 end
# File src/ruby/spec/generic/client_stub_spec.rb, line 975 def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts, expected_metadata: {}, server_initial_md: {}, server_trailing_md: {}) wanted_metadata = expected_metadata.clone wakey_thread do |notifier| c = expect_server_to_be_invoked( notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end expected_inputs.each do |i| if client_starts expect(c.remote_read).to eq(i) c.remote_send(i) else c.remote_send(i) expect(c.remote_read).to eq(i) end end c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) close_active_server_call(c) end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 964 def run_bidi_streamer_handle_inputs_first(expected_inputs, replys, status) wakey_thread do |notifier| c = expect_server_to_be_invoked(notifier) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } replys.each { |r| c.remote_send(r) } c.send_status(status, status == @pass ? 'OK' : 'NOK', true) close_active_server_call(c) end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 1001 def run_client_streamer(expected_inputs, resp, status, expected_metadata: {}, server_initial_md: {}, server_trailing_md: {}) wanted_metadata = expected_metadata.clone wakey_thread do |notifier| c = expect_server_to_be_invoked( notifier, metadata_to_send: server_initial_md) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) close_active_server_call(c) end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 713 def run_error_in_client_request_stream_test(requests_to_push, expected_error_message) # start a server that waits on a read indefinitely - it should # see a cancellation and be able to break out th = Thread.new { run_server_bidi_send_one_then_read_indefinitely } stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) request_queue = Queue.new @sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue) verify_error_from_write_thread(stub, requests_to_push, request_queue, expected_error_message) # the write loop errror should cancel the call and end the # server's request stream th.join end
# File src/ruby/spec/generic/client_stub_spec.rb, line 324 def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } th = run_request_response( @sent_msg, @resp, @pass, expected_metadata: @metadata, server_initial_md: @server_initial_md, server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect( get_response(stub, run_start_call_first: run_start_call_first)).to eq(@resp) th.join end
# File src/ruby/spec/generic/client_stub_spec.rb, line 1040 def run_request_response(expected_input, resp, status, expected_metadata: {}, server_initial_md: {}, server_trailing_md: {}) wanted_metadata = expected_metadata.clone wakey_thread do |notifier| c = expect_server_to_be_invoked( notifier, metadata_to_send: server_initial_md) expect(c.remote_read).to eq(expected_input) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) close_active_server_call(c) end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 894 def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) @server.start recvd_rpc = @server.request_call recvd_call = recvd_rpc.call server_call = GRPC::ActiveCall.new( recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true, started: false) server_call.send_initial_metadata server_call.remote_send('server call received') wait_for_shutdown_ok_callback.call # since the client is cancelling the call, # we should be able to shut down cleanly @server.shutdown_and_notify(nil) @server.close end
# File src/ruby/spec/generic/client_stub_spec.rb, line 661 def run_server_bidi_send_one_then_read_indefinitely @server.start recvd_rpc = @server.request_call recvd_call = recvd_rpc.call server_call = GRPC::ActiveCall.new( recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true, started: false) server_call.send_initial_metadata server_call.remote_send('server response') loop do m = server_call.remote_read break if m.nil? end # can't fail since initial metadata already sent server_call.send_status(@pass, 'OK', true) close_active_server_call(server_call) end
# File src/ruby/spec/generic/client_stub_spec.rb, line 766 def run_server_bidi_shutdown_after_one_read @server.start recvd_rpc = @server.request_call recvd_call = recvd_rpc.call server_call = GRPC::ActiveCall.new( recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true, started: false) expect(server_call.remote_read).to eq('first message') @server.shutdown_and_notify(from_relative_time(0)) @server.close end
# File src/ruby/spec/generic/client_stub_spec.rb, line 804 def run_server_bidi_shutdown_after_one_write @server.start recvd_rpc = @server.request_call recvd_call = recvd_rpc.call server_call = GRPC::ActiveCall.new( recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true, started: false) server_call.send_initial_metadata server_call.remote_send('message') @server.shutdown_and_notify(from_relative_time(0)) @server.close end
# File src/ruby/spec/generic/client_stub_spec.rb, line 945 def run_server_streamer(expected_input, replys, status, expected_metadata: {}, server_initial_md: {}, server_trailing_md: {}) wanted_metadata = expected_metadata.clone wakey_thread do |notifier| c = expect_server_to_be_invoked( notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end expect(c.remote_read).to eq(expected_input) replys.each { |r| c.remote_send(r) } c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) close_active_server_call(c) end end
# File src/ruby/spec/generic/client_stub_spec.rb, line 1020 def run_server_streamer_handle_client_cancellation( expected_input, replys) wakey_thread do |notifier| c = expect_server_to_be_invoked(notifier) expect(c.remote_read).to eq(expected_input) begin replys.each { |r| c.remote_send(r) } rescue GRPC::Core::CallError # An attempt to write to the client might fail. This is ok # because the client call is expected to cancel the call, # and there is a race as for when the server-side call will # start to fail. p 'remote_send failed (allowed because call expected to cancel)' ensure c.send_status(OK, 'OK', true) close_active_server_call(c) end end end
send 1 rpc every 1/qps second
# File src/ruby/pb/test/xds_client.rb, line 219 def run_test_loop(stub, target_seconds_between_rpcs, fail_on_failed_rpcs) include Grpc::Testing simple_req = SimpleRequest.new() empty_req = Empty.new() target_next_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) # Some RPCs are meant to be "kept open". Since Ruby does not have an # async API, we are executing those RPCs in a thread so that they don't # block. keep_open_threads = Array.new while !$shutdown now = Process.clock_gettime(Process::CLOCK_MONOTONIC) sleep_seconds = target_next_start - now if sleep_seconds < 0 target_next_start = now + target_seconds_between_rpcs else target_next_start += target_seconds_between_rpcs sleep(sleep_seconds) end deadline_sec = $rpc_config.timeout_sec > 0 ? $rpc_config.timeout_sec : 30 deadline = GRPC::Core::TimeConsts::from_relative_time(deadline_sec) results = {} $rpc_config.rpcs_to_send.each do |rpc| # rpc is in the form of :UNARY_CALL or :EMPTY_CALL here metadata = $rpc_config.metadata_to_send.key?(rpc) ? $rpc_config.metadata_to_send[rpc] : {} $accumulated_stats_mu.synchronize do $num_rpcs_started_by_method[rpc.to_s] += 1 $accumulated_method_stats[rpc.to_s].increment_rpcs_started() end if rpc == :UNARY_CALL op = stub.unary_call(simple_req, metadata: metadata, deadline: deadline, return_op: true) elsif rpc == :EMPTY_CALL op = stub.empty_call(empty_req, metadata: metadata, deadline: deadline, return_op: true) else raise "Unsupported rpc #{rpc}" end keep_open_threads << execute_rpc_in_thread(op, rpc) end # collect results from threads $thread_results_mu.synchronize do $thread_results.each do |r| rpc_name, remote_peer = r results[rpc_name] = remote_peer end $thread_results = Array.new end $watchers_mutex.synchronize do $watchers.each do |watcher| # this is counted once when each group of all rpcs_to_send were done watcher['rpcs_needed'] -= 1 results.each do |rpc_name, remote_peer| # These stats expect rpc_name to be in the form of # UnaryCall or EmptyCall, not the underscore-case all-caps form rpc_name = $RPC_MAP.invert()[rpc_name] if remote_peer.strip.empty? # error is counted per individual RPC watcher['no_remote_peer'] += 1 else if not watcher['rpcs_by_method'].key?(rpc_name) watcher['rpcs_by_method'][rpc_name] = Hash.new(0) end # increment the remote hostname distribution histogram # both by overall, and broken down per RPC watcher['rpcs_by_method'][rpc_name][remote_peer] += 1 watcher['rpcs_by_peer'][remote_peer] += 1 end end end $watchers_cv.broadcast end end keep_open_threads.each { |thd| thd.join } end
# File src/ruby/spec/generic/client_stub_spec.rb, line 64 def sanity_check_values_of_accessors(op_view, expected_metadata, expected_trailing_metadata) expected_status = Struct::Status.new expected_status.code = 0 expected_status.details = 'OK' expected_status.metadata = expected_trailing_metadata expect(op_view.status).to eq(expected_status) expect(op_view.metadata).to eq(expected_metadata) expect(op_view.trailing_metadata).to eq(expected_trailing_metadata) expect(op_view.cancelled?).to be(false) expect(op_view.write_flag).to be(nil) # The deadline attribute of a call can be either # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive. # TODO: fix so that the accessor always returns the same type. expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) || op_view.deadline.is_a?(Time)).to be(true) end
# File src/ruby/spec/generic/active_call_spec.rb, line 29 def send_and_receive_close_and_status(client_call, server_call) client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) server_call.run_batch(CallOps::RECV_CLOSE_ON_SERVER => nil, CallOps::SEND_STATUS_FROM_SERVER => ok_status) client_call.run_batch(CallOps::RECV_STATUS_ON_CLIENT => nil) end
# File src/ruby/spec/client_server_spec.rb, line 27 def server_allows_client_to_proceed(metadata = {}) recvd_rpc = @server.request_call expect(recvd_rpc).to_not eq nil server_call = recvd_rpc.call ops = { CallOps::SEND_INITIAL_METADATA => metadata } server_batch = server_call.run_batch(ops) expect(server_batch.send_metadata).to be true server_call end
creates the SSL Credentials.
# File src/ruby/pb/test/client.rb, line 91 def ssl_creds(use_test_ca) return test_creds if use_test_ca prod_creds end
# File src/ruby/spec/server_spec.rb, line 225 def start_a_server s = new_core_server_for_testing(nil) s.add_http2_port('0.0.0.0:0', :this_port_is_insecure) s.start s end
# File src/ruby/spec/channel_connection_spec.rb, line 21 def start_server(port = 0) @srv = new_rpc_server_for_testing(pool_size: 1) server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure) @srv.handle(EchoService) @server_thd = Thread.new { @srv.run } @srv.wait_till_running server_port end
# File src/ruby/spec/channel_connection_spec.rb, line 30 def stop_server expect(@srv.stopped?).to be(false) @srv.stop @server_thd.join expect(@srv.stopped?).to be(true) end
# File src/ruby/bin/math_client.rb, line 99 def test_creds certs = load_test_certs GRPC::Core::ChannelCredentials.new(certs[0]) end
# File src/ruby/bin/math_server.rb, line 157 def test_server_creds certs = load_test_certs GRPC::Core::ServerCredentials.new( nil, [{ private_key: certs[1], cert_chain: certs[2] }], false) end
convert upper snake-case to camel case. e.g., DEADLINE_EXCEEDED -> DeadlineExceeded
# File src/ruby/spec/error_sanity_spec.rb, line 22 def upper_snake_to_camel(name) name.to_s.split('_').map(&:downcase).map(&:capitalize).join('') end
# File src/ruby/spec/generic/client_stub_spec.rb, line 679 def verify_error_from_write_thread(stub, requests_to_push, request_queue, expected_description) # TODO: an improvement might be to raise the original exception from # bidi call write loops instead of only cancelling the call failing_marshal_proc = proc do |req| fail req if req.is_a?(StandardError) req end begin e = get_responses(stub, marshal_proc: failing_marshal_proc) first_response = e.next expect(first_response).to eq('server response') requests_to_push.each { |req| request_queue.push(req) } e.collect { |r| r } rescue GRPC::Unknown => e exception = e end expect(exception.message.include?(expected_description)).to be(true) end
# File src/ruby/spec/generic/client_stub_spec.rb, line 19 def wakey_thread(&blk) n = GRPC::Notifier.new t = Thread.new do blk.call(n) end t.abort_on_exception = true n.wait t end
# File src/ruby/spec/pb/codegen/package_option_spec.rb, line 73 def with_protos(file_paths) pb_dir = File.dirname(__FILE__) bins_dir = File.join('..', '..', '..', '..', '..', 'cmake', 'build') plugin = File.join(bins_dir, 'grpc_ruby_plugin') protoc = File.join(bins_dir, 'third_party', 'protobuf', 'protoc') # Generate the service from the proto Dir.mktmpdir(nil, File.dirname(__FILE__)) do |tmp_dir| gen_file = system(protoc, '-I.', *file_paths, "--grpc_out=#{tmp_dir}", # generate the service "--ruby_out=#{tmp_dir}", # generate the definitions "--plugin=protoc-gen-grpc=#{plugin}", chdir: pb_dir, out: File::NULL) expect(gen_file).to be_truthy begin $LOAD_PATH.push(tmp_dir) yield ensure $LOAD_PATH.delete(tmp_dir) end end end