Class: GRPC::RpcServer
- Inherits:
-
Object
- Object
- GRPC::RpcServer
- Extended by:
- Forwardable
- Includes:
- Core::CallOps, Core::TimeConsts
- Defined in:
- src/ruby/lib/grpc/generic/rpc_server.rb
Overview
RpcServer hosts a number of services and makes them available on the network.
Constant Summary collapse
- DEFAULT_POOL_SIZE =
Default thread pool size is 30
30
- DEFAULT_MAX_WAITING_REQUESTS =
Deprecated due to internal changes to the thread pool
20
- DEFAULT_POLL_PERIOD =
Default poll period is 1s
1
- SIGNAL_CHECK_PERIOD =
Signal check period is 0.25s
0.25
Class Method Summary collapse
-
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
Instance Method Summary collapse
-
#available?(an_rpc) ⇒ Boolean
Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs.
-
#handle(service) ⇒ Object
handle registration of classes.
-
#implemented?(an_rpc) ⇒ Boolean
Sends UNIMPLEMENTED if the method is not implemented by this server.
-
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, connect_md_proc: nil, server_args: {}, interceptors: []) ⇒ RpcServer
constructor
Creates a new RpcServer.
-
#loop_handle_server_calls ⇒ Object
handles calls to the server.
- #new_active_server_call(an_rpc) ⇒ Object
-
#run ⇒ Object
(also: #run_till_terminated)
runs the server.
- #running? ⇒ Boolean
- #running_state ⇒ Object
-
#stop ⇒ Object
stops a running server.
- #stopped? ⇒ Boolean
-
#transition_running_state(target_state) ⇒ Object
Can only be called while holding @run_mutex.
-
#wait_till_running(timeout = nil) ⇒ Object
Is called from other threads to wait for #run to start up the server.
Methods included from Core::TimeConsts
Constructor Details
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, connect_md_proc: nil, server_args: {}, interceptors: []) ⇒ RpcServer
Creates a new RpcServer.
The RPC server is configured using keyword arguments.
There are some specific keyword args used to configure the RpcServer instance.
-
pool_size: the size of the thread pool the server uses to run its
threads. No more concurrent requests can be made than the size of the thread pool
-
max_waiting_requests: Deprecated due to internal changes to the thread
pool. This is still an argument for compatibility but is ignored.
-
poll_period: when present, the server polls for new events with this
period
-
connect_md_proc:
when non-nil is a proc for determining metadata to to send back the client on receiving an invocation req. The proc signature is: val, .. func(method_name, val, …)
-
server_args:
A server arguments hash to be passed down to the underlying core server
-
interceptors:
Am array of GRPC::ServerInterceptor objects that will be used for intercepting server handlers to provide extra functionality. Interceptors are an EXPERIMENTAL API.
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 205 def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, connect_md_proc:nil, server_args:{}, interceptors:[]) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size @pool = Pool.new(@pool_size) @run_cond = ConditionVariable.new @run_mutex = Mutex.new # running_state can take 4 values: :not_started, :running, :stopping, and # :stopped. State transitions can only proceed in that order. @running_state = :not_started @server = Core::Server.new(server_args) @interceptors = InterceptorRegistry.new(interceptors) end |
Class Method Details
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
169 170 171 172 173 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 169 def self.setup_connect_md_proc(a_proc) return nil if a_proc.nil? fail(TypeError, '!Proc') unless a_proc.is_a? Proc a_proc end |
Instance Method Details
#available?(an_rpc) ⇒ Boolean
Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 345 def available?(an_rpc) return an_rpc if @pool.ready_for_work? GRPC.logger.warn('no free worker threads currently') noop = proc { |x| x } # Create a new active call that knows that metadata hasn't been # sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '') nil end |
#handle(service) ⇒ Object
handle registration of classes
service is either a class that includes GRPC::GenericService and whose #new function can be called without argument or any instance of such a class.
E.g, after
class Divider
include GRPC::GenericService
rpc :div DivArgs, DivReply # single request, single response
def initialize(optional_arg='default option') # no args
...
end
srv = GRPC::RpcServer.new(…)
# Either of these works
srv.handle(Divider)
# or
srv.handle(Divider.new(‘replace optional arg’))
It raises RuntimeError:
-
if service is not valid service class or object
-
its handler methods are already registered
-
if the server is already running
313 314 315 316 317 318 319 320 321 322 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 313 def handle(service) @run_mutex.synchronize do unless @running_state == :not_started fail 'cannot add services if the server has been started' end cls = service.is_a?(Class) ? service : service.class assert_valid_service_class(cls) add_rpc_descs_for(service) end end |
#implemented?(an_rpc) ⇒ Boolean
Sends UNIMPLEMENTED if the method is not implemented by this server
359 360 361 362 363 364 365 366 367 368 369 370 371 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 359 def implemented?(an_rpc) mth = an_rpc.method.to_sym return an_rpc if rpc_descs.key?(mth) GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") noop = proc { |x| x } # Create a new active call that knows that # metadata hasn't been sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') nil end |
#loop_handle_server_calls ⇒ Object
handles calls to the server
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 374 def loop_handle_server_calls fail 'not started' if running_state == :not_started while running_state == :running begin an_rpc = @server.request_call break if (!an_rpc.nil?) && an_rpc.call.nil? active_call = new_active_server_call(an_rpc) unless active_call.nil? @pool.schedule(active_call) do |ac| c, mth = ac begin rpc_descs[mth].run_server_method( c, rpc_handlers[mth], @interceptors.build_context ) rescue StandardError c.send_status(GRPC::Core::StatusCodes::INTERNAL, 'Server handler failed') end end end rescue Core::CallError, RuntimeError => e # these might happen for various reasons. The correct behavior of # the server is to log them and continue, if it's not shutting down. if running_state == :running GRPC.logger.warn("server call failed: #{e}") end next end end # @running_state should be :stopping here @run_mutex.synchronize { transition_running_state(:stopped) } GRPC.logger.info("stopped: #{self}") end |
#new_active_server_call(an_rpc) ⇒ Object
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 410 def new_active_server_call(an_rpc) return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call an_rpc.call. = an_rpc. # attaches md to call for handlers connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.) end return nil unless available?(an_rpc) return nil unless implemented?(an_rpc) # Create the ActiveCall. Indicate that metadata hasnt been sent yet. GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), an_rpc.deadline, metadata_received: true, started: false, metadata_to_send: connect_md) c.attach_peer_cert(an_rpc.call.peer_cert) mth = an_rpc.method.to_sym [c, mth] end |
#run ⇒ Object Also known as: run_till_terminated
runs the server
-
if no rpc_descs are registered, this exits immediately, otherwise it continues running permanently and does not return until program exit.
-
#running? returns true after this is called, until #stop cause the the server to stop.
331 332 333 334 335 336 337 338 339 340 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 331 def run @run_mutex.synchronize do fail 'cannot run without registering services' if rpc_descs.size.zero? @pool.start @server.start transition_running_state(:running) @run_cond.broadcast end loop_handle_server_calls end |
#running? ⇒ Boolean
260 261 262 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 260 def running? running_state == :running end |
#running_state ⇒ Object
240 241 242 243 244 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 240 def running_state @run_mutex.synchronize do return @running_state end end |
#stop ⇒ Object
stops a running server
the call has no impact if the server is already stopped, otherwise server’s current call loop is it’s last.
229 230 231 232 233 234 235 236 237 238 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 229 def stop @run_mutex.synchronize do fail 'Cannot stop before starting' if @running_state == :not_started return if @running_state != :running transition_running_state(:stopping) end deadline = from_relative_time(@poll_period) @server.close(deadline) @pool.stop end |
#stopped? ⇒ Boolean
264 265 266 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 264 def stopped? running_state == :stopped end |
#transition_running_state(target_state) ⇒ Object
Can only be called while holding @run_mutex
247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 247 def transition_running_state(target_state) state_transitions = { not_started: :running, running: :stopping, stopping: :stopped } if state_transitions[@running_state] == target_state @running_state = target_state else fail "Bad server state transition: #{@running_state}->#{target_state}" end end |
#wait_till_running(timeout = nil) ⇒ Object
Is called from other threads to wait for #run to start up the server.
If run has not been called, this returns immediately.
274 275 276 277 278 279 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 274 def wait_till_running(timeout = nil) @run_mutex.synchronize do @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started return @running_state == :running end end |