Class: GRPC::RpcServer
- Inherits:
-
Object
- Object
- GRPC::RpcServer
- Extended by:
- Forwardable
- Includes:
- Core::CallOps, Core::TimeConsts
- Defined in:
- src/ruby/lib/grpc/generic/rpc_server.rb
Overview
RpcServer hosts a number of services and makes them available on the network.
Constant Summary collapse
- DEFAULT_POOL_SIZE =
Default thread pool size is 30
30
- DEFAULT_MAX_WAITING_REQUESTS =
Deprecated due to internal changes to the thread pool
20
- DEFAULT_POLL_PERIOD =
Default poll period is 1s
1
- SIGNAL_CHECK_PERIOD =
Signal check period is 0.25s
0.25
Class Method Summary collapse
-
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
Instance Method Summary collapse
-
#available?(an_rpc) ⇒ Boolean
Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs.
-
#handle(service) ⇒ Object
handle registration of classes.
-
#implemented?(an_rpc) ⇒ Boolean
Sends UNIMPLEMENTED if the method is not implemented by this server.
-
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, connect_md_proc: nil, server_args: {}) ⇒ RpcServer
constructor
Creates a new RpcServer.
-
#loop_handle_server_calls ⇒ Object
handles calls to the server.
- #new_active_server_call(an_rpc) ⇒ Object
-
#run ⇒ Object
(also: #run_till_terminated)
runs the server.
- #running? ⇒ Boolean
- #running_state ⇒ Object
-
#stop ⇒ Object
stops a running server.
- #stopped? ⇒ Boolean
-
#transition_running_state(target_state) ⇒ Object
Can only be called while holding @run_mutex.
-
#wait_till_running(timeout = nil) ⇒ Object
Is called from other threads to wait for #run to start up the server.
Methods included from Core::TimeConsts
Constructor Details
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, connect_md_proc: nil, server_args: {}) ⇒ RpcServer
Creates a new RpcServer.
The RPC server is configured using keyword arguments.
There are some specific keyword args used to configure the RpcServer instance.
-
pool_size: the size of the thread pool the server uses to run its
threads. No more concurrent requests can be made than the size of the thread pool
-
max_waiting_requests: Deprecated due to internal changes to the thread
pool. This is still an argument for compatibility but is ignored.
-
poll_period: when present, the server polls for new events with this
period
-
connect_md_proc:
when non-nil is a proc for determining metadata to to send back the client on receiving an invocation req. The proc signature is: val, .. func(method_name, val, …)
-
server_args:
A server arguments hash to be passed down to the underlying core server
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 199 def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, connect_md_proc:nil, server_args:{}) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size @pool = Pool.new(@pool_size) @run_cond = ConditionVariable.new @run_mutex = Mutex.new # running_state can take 4 values: :not_started, :running, :stopping, and # :stopped. State transitions can only proceed in that order. @running_state = :not_started @server = Core::Server.new(server_args) end |
Class Method Details
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
169 170 171 172 173 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 169 def self.setup_connect_md_proc(a_proc) return nil if a_proc.nil? fail(TypeError, '!Proc') unless a_proc.is_a? Proc a_proc end |
Instance Method Details
#available?(an_rpc) ⇒ Boolean
Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
337 338 339 340 341 342 343 344 345 346 347 348 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 337 def available?(an_rpc) return an_rpc if @pool.ready_for_work? GRPC.logger.warn('no free worker threads currently') noop = proc { |x| x } # Create a new active call that knows that metadata hasn't been # sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '') nil end |
#handle(service) ⇒ Object
handle registration of classes
service is either a class that includes GRPC::GenericService and whose #new function can be called without argument or any instance of such a class.
E.g, after
class Divider
include GRPC::GenericService
rpc :div DivArgs, DivReply # single request, single response
def initialize(optional_arg='default option') # no args
...
end
srv = GRPC::RpcServer.new(…)
# Either of these works
srv.handle(Divider)
# or
srv.handle(Divider.new(‘replace optional arg’))
It raises RuntimeError:
-
if service is not valid service class or object
-
its handler methods are already registered
-
if the server is already running
305 306 307 308 309 310 311 312 313 314 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 305 def handle(service) @run_mutex.synchronize do unless @running_state == :not_started fail 'cannot add services if the server has been started' end cls = service.is_a?(Class) ? service : service.class assert_valid_service_class(cls) add_rpc_descs_for(service) end end |
#implemented?(an_rpc) ⇒ Boolean
Sends UNIMPLEMENTED if the method is not implemented by this server
351 352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 351 def implemented?(an_rpc) mth = an_rpc.method.to_sym return an_rpc if rpc_descs.key?(mth) GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") noop = proc { |x| x } # Create a new active call that knows that # metadata hasn't been sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') nil end |
#loop_handle_server_calls ⇒ Object
handles calls to the server
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 366 def loop_handle_server_calls fail 'not started' if running_state == :not_started while running_state == :running begin an_rpc = @server.request_call break if (!an_rpc.nil?) && an_rpc.call.nil? active_call = new_active_server_call(an_rpc) unless active_call.nil? @pool.schedule(active_call) do |ac| c, mth = ac begin rpc_descs[mth].run_server_method(c, rpc_handlers[mth]) rescue StandardError c.send_status(GRPC::Core::StatusCodes::INTERNAL, 'Server handler failed') end end end rescue Core::CallError, RuntimeError => e # these might happen for various reasonse. The correct behaviour of # the server is to log them and continue, if it's not shutting down. if running_state == :running GRPC.logger.warn("server call failed: #{e}") end next end end # @running_state should be :stopping here @run_mutex.synchronize { transition_running_state(:stopped) } GRPC.logger.info("stopped: #{self}") end |
#new_active_server_call(an_rpc) ⇒ Object
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 398 def new_active_server_call(an_rpc) return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call an_rpc.call. = an_rpc. # attaches md to call for handlers connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.) end return nil unless available?(an_rpc) return nil unless implemented?(an_rpc) # Create the ActiveCall. Indicate that metadata hasnt been sent yet. GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), an_rpc.deadline, metadata_received: true, started: false, metadata_to_send: connect_md) c.attach_peer_cert(an_rpc.call.peer_cert) mth = an_rpc.method.to_sym [c, mth] end |
#run ⇒ Object Also known as: run_till_terminated
runs the server
-
if no rpc_descs are registered, this exits immediately, otherwise it continues running permanently and does not return until program exit.
-
#running? returns true after this is called, until #stop cause the the server to stop.
323 324 325 326 327 328 329 330 331 332 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 323 def run @run_mutex.synchronize do fail 'cannot run without registering services' if rpc_descs.size.zero? @pool.start @server.start transition_running_state(:running) @run_cond.broadcast end loop_handle_server_calls end |
#running? ⇒ Boolean
252 253 254 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 252 def running? running_state == :running end |
#running_state ⇒ Object
232 233 234 235 236 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 232 def running_state @run_mutex.synchronize do return @running_state end end |
#stop ⇒ Object
stops a running server
the call has no impact if the server is already stopped, otherwise server’s current call loop is it’s last.
221 222 223 224 225 226 227 228 229 230 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 221 def stop @run_mutex.synchronize do fail 'Cannot stop before starting' if @running_state == :not_started return if @running_state != :running transition_running_state(:stopping) end deadline = from_relative_time(@poll_period) @server.close(deadline) @pool.stop end |
#stopped? ⇒ Boolean
256 257 258 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 256 def stopped? running_state == :stopped end |
#transition_running_state(target_state) ⇒ Object
Can only be called while holding @run_mutex
239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 239 def transition_running_state(target_state) state_transitions = { not_started: :running, running: :stopping, stopping: :stopped } if state_transitions[@running_state] == target_state @running_state = target_state else fail "Bad server state transition: #{@running_state}->#{target_state}" end end |
#wait_till_running(timeout = nil) ⇒ Object
Is called from other threads to wait for #run to start up the server.
If run has not been called, this returns immediately.
266 267 268 269 270 271 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 266 def wait_till_running(timeout = nil) @run_mutex.synchronize do @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started return @running_state == :running end end |