Class: GRPC::RpcServer
- Inherits:
-
Object
- Object
- GRPC::RpcServer
- Extended by:
- Forwardable
- Includes:
- Core::CallOps, Core::TimeConsts
- Defined in:
- src/ruby/lib/grpc/generic/rpc_server.rb
Overview
RpcServer hosts a number of services and makes them available on the network.
Constant Summary collapse
- DEFAULT_POOL_SIZE =
Default thread pool size is 30
30
- DEFAULT_MAX_WAITING_REQUESTS =
Deprecated due to internal changes to the thread pool
20
- DEFAULT_POLL_PERIOD =
Default poll period is 1s
1
- SIGNAL_CHECK_PERIOD =
Signal check period is 0.25s
0.25
Class Method Summary collapse
-
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
Instance Method Summary collapse
-
#available?(an_rpc) ⇒ Boolean
Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs.
-
#handle(service) ⇒ Object
handle registration of classes.
-
#implemented?(an_rpc) ⇒ Boolean
Sends UNIMPLEMENTED if the method is not implemented by this server.
-
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, connect_md_proc: nil, server_args: {}) ⇒ RpcServer
constructor
Creates a new RpcServer.
-
#loop_handle_server_calls ⇒ Object
handles calls to the server.
- #new_active_server_call(an_rpc) ⇒ Object
-
#run ⇒ Object
(also: #run_till_terminated)
runs the server.
- #running? ⇒ Boolean
- #running_state ⇒ Object
-
#stop ⇒ Object
stops a running server.
- #stopped? ⇒ Boolean
-
#transition_running_state(target_state) ⇒ Object
Can only be called while holding @run_mutex.
-
#wait_till_running(timeout = nil) ⇒ Object
Is called from other threads to wait for #run to start up the server.
Methods included from Core::TimeConsts
Constructor Details
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, connect_md_proc: nil, server_args: {}) ⇒ RpcServer
Creates a new RpcServer.
The RPC server is configured using keyword arguments.
There are some specific keyword args used to configure the RpcServer instance.
-
pool_size: the size of the thread pool the server uses to run its
threads. No more concurrent requests can be made than the size of the thread pool
-
max_waiting_requests: Deprecated due to internal changes to the thread
pool. This is still an argument for compatibility but is ignored.
-
poll_period: when present, the server polls for new events with this
period
-
connect_md_proc:
when non-nil is a proc for determining metadata to to send back the client on receiving an invocation req. The proc signature is: val, .. func(method_name, val, …)
-
server_args:
A server arguments hash to be passed down to the underlying core server
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 214 def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, connect_md_proc:nil, server_args:{}) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size @pool = Pool.new(@pool_size) @run_cond = ConditionVariable.new @run_mutex = Mutex.new # running_state can take 4 values: :not_started, :running, :stopping, and # :stopped. State transitions can only proceed in that order. @running_state = :not_started @server = Core::Server.new(server_args) end |
Class Method Details
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
184 185 186 187 188 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 184 def self.setup_connect_md_proc(a_proc) return nil if a_proc.nil? fail(TypeError, '!Proc') unless a_proc.is_a? Proc a_proc end |
Instance Method Details
#available?(an_rpc) ⇒ Boolean
Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 352 def available?(an_rpc) return an_rpc if @pool.ready_for_work? GRPC.logger.warn('no free worker threads currently') noop = proc { |x| x } # Create a new active call that knows that metadata hasn't been # sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '') nil end |
#handle(service) ⇒ Object
handle registration of classes
service is either a class that includes GRPC::GenericService and whose #new function can be called without argument or any instance of such a class.
E.g, after
class Divider
include GRPC::GenericService
rpc :div DivArgs, DivReply # single request, single response
def initialize(optional_arg='default option') # no args
...
end
srv = GRPC::RpcServer.new(…)
# Either of these works
srv.handle(Divider)
# or
srv.handle(Divider.new(‘replace optional arg’))
It raises RuntimeError:
-
if service is not valid service class or object
-
its handler methods are already registered
-
if the server is already running
320 321 322 323 324 325 326 327 328 329 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 320 def handle(service) @run_mutex.synchronize do unless @running_state == :not_started fail 'cannot add services if the server has been started' end cls = service.is_a?(Class) ? service : service.class assert_valid_service_class(cls) add_rpc_descs_for(service) end end |
#implemented?(an_rpc) ⇒ Boolean
Sends UNIMPLEMENTED if the method is not implemented by this server
366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 366 def implemented?(an_rpc) mth = an_rpc.method.to_sym return an_rpc if rpc_descs.key?(mth) GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") noop = proc { |x| x } # Create a new active call that knows that # metadata hasn't been sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') nil end |
#loop_handle_server_calls ⇒ Object
handles calls to the server
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 381 def loop_handle_server_calls fail 'not started' if running_state == :not_started while running_state == :running begin an_rpc = @server.request_call break if (!an_rpc.nil?) && an_rpc.call.nil? active_call = new_active_server_call(an_rpc) unless active_call.nil? @pool.schedule(active_call) do |ac| c, mth = ac begin rpc_descs[mth].run_server_method(c, rpc_handlers[mth]) rescue StandardError c.send_status(GRPC::Core::StatusCodes::INTERNAL, 'Server handler failed') end end end rescue Core::CallError, RuntimeError => e # these might happen for various reasonse. The correct behaviour of # the server is to log them and continue, if it's not shutting down. if running_state == :running GRPC.logger.warn("server call failed: #{e}") end next end end # @running_state should be :stopping here @run_mutex.synchronize { transition_running_state(:stopped) } GRPC.logger.info("stopped: #{self}") end |
#new_active_server_call(an_rpc) ⇒ Object
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 413 def new_active_server_call(an_rpc) return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call an_rpc.call. = an_rpc. # attaches md to call for handlers connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.) end return nil unless available?(an_rpc) return nil unless implemented?(an_rpc) # Create the ActiveCall. Indicate that metadata hasnt been sent yet. GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), an_rpc.deadline, metadata_received: true, started: false, metadata_to_send: connect_md) mth = an_rpc.method.to_sym [c, mth] end |
#run ⇒ Object Also known as: run_till_terminated
runs the server
-
if no rpc_descs are registered, this exits immediately, otherwise it continues running permanently and does not return until program exit.
-
#running? returns true after this is called, until #stop cause the the server to stop.
338 339 340 341 342 343 344 345 346 347 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 338 def run @run_mutex.synchronize do fail 'cannot run without registering services' if rpc_descs.size.zero? @pool.start @server.start transition_running_state(:running) @run_cond.broadcast end loop_handle_server_calls end |
#running? ⇒ Boolean
267 268 269 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 267 def running? running_state == :running end |
#running_state ⇒ Object
247 248 249 250 251 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 247 def running_state @run_mutex.synchronize do return @running_state end end |
#stop ⇒ Object
stops a running server
the call has no impact if the server is already stopped, otherwise server’s current call loop is it’s last.
236 237 238 239 240 241 242 243 244 245 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 236 def stop @run_mutex.synchronize do fail 'Cannot stop before starting' if @running_state == :not_started return if @running_state != :running transition_running_state(:stopping) end deadline = from_relative_time(@poll_period) @server.close(deadline) @pool.stop end |
#stopped? ⇒ Boolean
271 272 273 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 271 def stopped? running_state == :stopped end |
#transition_running_state(target_state) ⇒ Object
Can only be called while holding @run_mutex
254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 254 def transition_running_state(target_state) state_transitions = { not_started: :running, running: :stopping, stopping: :stopped } if state_transitions[@running_state] == target_state @running_state = target_state else fail "Bad server state transition: #{@running_state}->#{target_state}" end end |
#wait_till_running(timeout = nil) ⇒ Object
Is called from other threads to wait for #run to start up the server.
If run has not been called, this returns immediately.
281 282 283 284 285 286 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 281 def wait_till_running(timeout = nil) @run_mutex.synchronize do @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started return @running_state == :running end end |