Class: GRPC::RpcServer
- Inherits:
-
Object
- Object
- GRPC::RpcServer
- Extended by:
- Forwardable
- Includes:
- Core::CallOps, Core::TimeConsts
- Defined in:
- src/ruby/lib/grpc/generic/rpc_server.rb
Overview
RpcServer hosts a number of services and makes them available on the network.
Constant Summary collapse
- DEFAULT_POOL_SIZE =
Default thread pool size is 30
30
- DEFAULT_MAX_WAITING_REQUESTS =
Deprecated due to internal changes to the thread pool
20
- DEFAULT_POLL_PERIOD =
Default poll period is 1s
1
- SIGNAL_CHECK_PERIOD =
Signal check period is 0.25s
0.25
Class Method Summary collapse
-
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
Instance Method Summary collapse
-
#available?(an_rpc) ⇒ Boolean
Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs.
-
#handle(service) ⇒ Object
handle registration of classes.
-
#implemented?(an_rpc) ⇒ Boolean
Sends UNIMPLEMENTED if the method is not implemented by this server.
-
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, pool_keep_alive: GRPC::RpcServer::DEFAULT_POOL_SIZE, connect_md_proc: nil, server_args: {}, interceptors: []) ⇒ RpcServer
constructor
Creates a new RpcServer.
-
#loop_handle_server_calls ⇒ Object
handles calls to the server.
- #new_active_server_call(an_rpc) ⇒ Object
-
#run ⇒ Object
(also: #run_till_terminated)
runs the server.
- #running? ⇒ Boolean
- #running_state ⇒ Object
-
#stop ⇒ Object
stops a running server.
- #stopped? ⇒ Boolean
-
#transition_running_state(target_state) ⇒ Object
Can only be called while holding @run_mutex.
-
#wait_till_running(timeout = nil) ⇒ true, false
Is called from other threads to wait for #run to start up the server.
Methods included from Core::TimeConsts
Constructor Details
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, pool_keep_alive: GRPC::RpcServer::DEFAULT_POOL_SIZE, connect_md_proc: nil, server_args: {}, interceptors: []) ⇒ RpcServer
Creates a new RpcServer.
The RPC server is configured using keyword arguments.
There are some specific keyword args used to configure the RpcServer instance.
-
pool_size: the size of the thread pool the server uses to run its
threads. No more concurrent requests can be made than the size of the thread pool
-
max_waiting_requests: Deprecated due to internal changes to the thread
pool. This is still an argument for compatibility but is ignored.
-
poll_period: The amount of time in seconds to wait for
currently-serviced RPC’s to finish before cancelling them when shutting down the server.
-
pool_keep_alive: The amount of time in seconds to wait
for currently busy thread-pool threads to finish before forcing an abrupt exit to each thread.
-
connect_md_proc:
when non-nil is a proc for determining metadata to to send back the client on receiving an invocation req. The proc signature is:
{key: val, ..} func(method_name, {key: val, ...})
-
server_args:
A server arguments hash to be passed down to the underlying core server
-
interceptors:
Am array of GRPC::ServerInterceptor objects that will be used for intercepting server handlers to provide extra functionality. Interceptors are an EXPERIMENTAL API.
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 217 def initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, pool_keep_alive: GRPC::RpcServer::DEFAULT_POOL_SIZE, connect_md_proc: nil, server_args: {}, interceptors: []) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive) @run_cond = ConditionVariable.new @run_mutex = Mutex.new # running_state can take 4 values: :not_started, :running, :stopping, and # :stopped. State transitions can only proceed in that order. @running_state = :not_started @server = Core::Server.new(server_args) @interceptors = InterceptorRegistry.new(interceptors) end |
Class Method Details
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
176 177 178 179 180 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 176 def self.setup_connect_md_proc(a_proc) return nil if a_proc.nil? fail(TypeError, '!Proc') unless a_proc.is_a? Proc a_proc end |
Instance Method Details
#available?(an_rpc) ⇒ Boolean
Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
358 359 360 361 362 363 364 365 366 367 368 369 370 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 358 def available?(an_rpc) return an_rpc if @pool.ready_for_work? GRPC.logger.warn('no free worker threads currently') noop = proc { |x| x } # Create a new active call that knows that metadata hasn't been # sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, 'No free threads in thread pool') nil end |
#handle(service) ⇒ Object
handle registration of classes
service is either a class that includes GRPC::GenericService and whose #new function can be called without argument or any instance of such a class.
E.g, after
class Divider
include GRPC::GenericService
rpc :div DivArgs, DivReply # single request, single response
def initialize(optional_arg='default option') # no args
...
end
srv = GRPC::RpcServer.new(…)
# Either of these works
srv.handle(Divider)
# or
srv.handle(Divider.new(‘replace optional arg’))
It raises RuntimeError:
-
if service is not valid service class or object
-
its handler methods are already registered
-
if the server is already running
326 327 328 329 330 331 332 333 334 335 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 326 def handle(service) @run_mutex.synchronize do unless @running_state == :not_started fail 'cannot add services if the server has been started' end cls = service.is_a?(Class) ? service : service.class assert_valid_service_class(cls) add_rpc_descs_for(service) end end |
#implemented?(an_rpc) ⇒ Boolean
Sends UNIMPLEMENTED if the method is not implemented by this server
373 374 375 376 377 378 379 380 381 382 383 384 385 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 373 def implemented?(an_rpc) mth = an_rpc.method.to_sym return an_rpc if rpc_descs.key?(mth) GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") noop = proc { |x| x } # Create a new active call that knows that # metadata hasn't been sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') nil end |
#loop_handle_server_calls ⇒ Object
handles calls to the server
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 388 def loop_handle_server_calls fail 'not started' if running_state == :not_started while running_state == :running begin an_rpc = @server.request_call break if (!an_rpc.nil?) && an_rpc.call.nil? active_call = new_active_server_call(an_rpc) unless active_call.nil? @pool.schedule(active_call) do |ac| c, mth = ac begin rpc_descs[mth].run_server_method( c, rpc_handlers[mth], @interceptors.build_context ) rescue StandardError c.send_status(GRPC::Core::StatusCodes::INTERNAL, 'Server handler failed') end end end rescue Core::CallError, RuntimeError => e # these might happen for various reasons. The correct behavior of # the server is to log them and continue, if it's not shutting down. if running_state == :running GRPC.logger.warn("server call failed: #{e}") end next end end # @running_state should be :stopping here @run_mutex.synchronize do transition_running_state(:stopped) GRPC.logger.info("stopped: #{self}") @server.close end end |
#new_active_server_call(an_rpc) ⇒ Object
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 427 def new_active_server_call(an_rpc) return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call an_rpc.call. = an_rpc. # attaches md to call for handlers connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.) end return nil unless available?(an_rpc) return nil unless implemented?(an_rpc) # Create the ActiveCall. Indicate that metadata hasnt been sent yet. GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), an_rpc.deadline, metadata_received: true, started: false, metadata_to_send: connect_md) c.attach_peer_cert(an_rpc.call.peer_cert) mth = an_rpc.method.to_sym [c, mth] end |
#run ⇒ Object Also known as: run_till_terminated
runs the server
-
if no rpc_descs are registered, this exits immediately, otherwise it continues running permanently and does not return until program exit.
-
#running? returns true after this is called, until #stop cause the the server to stop.
344 345 346 347 348 349 350 351 352 353 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 344 def run @run_mutex.synchronize do fail 'cannot run without registering services' if rpc_descs.size.zero? @pool.start @server.start transition_running_state(:running) @run_cond.broadcast end loop_handle_server_calls end |
#running? ⇒ Boolean
273 274 275 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 273 def running? running_state == :running end |
#running_state ⇒ Object
253 254 255 256 257 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 253 def running_state @run_mutex.synchronize do return @running_state end end |
#stop ⇒ Object
stops a running server
the call has no impact if the server is already stopped, otherwise server’s current call loop is it’s last.
242 243 244 245 246 247 248 249 250 251 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 242 def stop @run_mutex.synchronize do fail 'Cannot stop before starting' if @running_state == :not_started return if @running_state != :running transition_running_state(:stopping) deadline = from_relative_time(@poll_period) @server.shutdown_and_notify(deadline) end @pool.stop end |
#stopped? ⇒ Boolean
277 278 279 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 277 def stopped? running_state == :stopped end |
#transition_running_state(target_state) ⇒ Object
Can only be called while holding @run_mutex
260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 260 def transition_running_state(target_state) state_transitions = { not_started: :running, running: :stopping, stopping: :stopped } if state_transitions[@running_state] == target_state @running_state = target_state else fail "Bad server state transition: #{@running_state}->#{target_state}" end end |
#wait_till_running(timeout = nil) ⇒ true, false
Is called from other threads to wait for #run to start up the server.
If run has not been called, this returns immediately.
287 288 289 290 291 292 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 287 def wait_till_running(timeout = nil) @run_mutex.synchronize do @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started return @running_state == :running end end |