Class: GRPC::RpcServer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::CompletionType, Core::TimeConsts
Defined in:
lib/grpc/generic/rpc_server.rb

Overview

RpcServer hosts a number of services and makes them available on the network.

Defined Under Namespace

Classes: Pool

Constant Summary collapse

DEFAULT_POOL_SIZE =

Default thread pool size is 3

3
DEFAULT_MAX_WAITING_REQUESTS =

Default max_waiting_requests size is 20

20

Instance Method Summary collapse

Methods included from Core::TimeConsts

from_relative_time

Constructor Details

#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: INFINITE_FUTURE, completion_queue_override: nil, creds: nil, server_override: nil, **kw) ⇒ RpcServer

Creates a new RpcServer.

The RPC server is configured using keyword arguments.

There are some specific keyword args used to configure the RpcServer instance, however other arbitrary are allowed and when present are used to configure the listeninng connection set up by the RpcServer.

  • server_override: which if passed must be a [GRPC::Core::Server]. When

present.

  • poll_period: when present, the server polls for new events with this

period

  • pool_size: the size of the thread pool the server uses to run its

threads

  • completion_queue_override: when supplied, this will be used as the

completion_queue that the server uses to receive network events, otherwise its creates a new instance itself

  • creds: [GRPC::Core::ServerCredentials]

the credentials used to secure the server

  • max_waiting_requests: the maximum number of requests that are not

being handled to allow. When this limit is exceeded, the server responds with not available to new requests



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/grpc/generic/rpc_server.rb', line 80

def initialize(pool_size:DEFAULT_POOL_SIZE,
               max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
               poll_period:INFINITE_FUTURE,
               completion_queue_override:nil,
               creds:nil,
               server_override:nil,
               **kw)
  if completion_queue_override.nil?
    cq = Core::CompletionQueue.new
  else
    cq = completion_queue_override
    unless cq.is_a? Core::CompletionQueue
      fail(ArgumentError, 'not a CompletionQueue')
    end
  end
  @cq = cq

  if server_override.nil?
    if creds.nil?
      srv = Core::Server.new(@cq, kw)
    elsif !creds.is_a? Core::ServerCredentials
      fail(ArgumentError, 'not a ServerCredentials')
    else
      srv = Core::Server.new(@cq, kw, creds)
    end
  else
    srv = server_override
    fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server
  end
  @server = srv

  @pool_size = pool_size
  @max_waiting_requests = max_waiting_requests
  @poll_period = poll_period
  @run_mutex = Mutex.new
  @run_cond = ConditionVariable.new
  @pool = Pool.new(@pool_size)
end

Instance Method Details

#handle(service) ⇒ Object

handle registration of classes

service is either a class that includes GRPC::GenericService and whose #new function can be called without argument or any instance of such a class.

E.g, after

class Divider

include GRPC::GenericService
rpc :div DivArgs, DivReply    # single request, single response
def initialize(optional_arg='default option') # no args
  ...
end

srv = GRPC::RpcServer.new(…)

# Either of these works

srv.handle(Divider)

# or

srv.handle(Divider.new(‘replace optional arg’))

It raises RuntimeError:

  • if service is not valid service class or object

  • its handler methods are already registered

  • if the server is already running

Parameters:

  • service (Object|Class)

    a service class or object as described above



186
187
188
189
190
191
192
# File 'lib/grpc/generic/rpc_server.rb', line 186

def handle(service)
  fail 'cannot add services if the server is running' if running?
  fail 'cannot add services if the server is stopped' if stopped?
  cls = service.is_a?(Class) ? service : service.class
  assert_valid_service_class(cls)
  add_rpc_descs_for(service)
end

#new_active_server_call(call, new_server_rpc) ⇒ Object



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/grpc/generic/rpc_server.rb', line 234

def new_active_server_call(call, new_server_rpc)
  # Accept the call.  This is necessary even if a status is to be sent
  # back immediately
  finished_tag = Object.new
  call_queue = Core::CompletionQueue.new
  call. = new_server_rpc.  # store the metadata
  call.server_accept(call_queue, finished_tag)
  call.

  # Send UNAVAILABLE if there are too many unprocessed jobs
  jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
  logger.info("waiting: #{jobs_count}, max: #{max}")
  if @pool.jobs_waiting > @max_waiting_requests
    logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}")
    noop = proc { |x| x }
    c = ActiveCall.new(call, call_queue, noop, noop,
                       new_server_rpc.deadline,
                       finished_tag: finished_tag)
    c.send_status(StatusCodes::UNAVAILABLE, '')
    return nil
  end

  # Send NOT_FOUND if the method does not exist
  mth = new_server_rpc.method.to_sym
  unless rpc_descs.key?(mth)
    logger.warn("NOT_FOUND: #{new_server_rpc}")
    noop = proc { |x| x }
    c = ActiveCall.new(call, call_queue, noop, noop,
                       new_server_rpc.deadline,
                       finished_tag: finished_tag)
    c.send_status(StatusCodes::NOT_FOUND, '')
    return nil
  end

  # Create the ActiveCall
  rpc_desc = rpc_descs[mth]
  logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})")
  ActiveCall.new(call, call_queue,
                 rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
                 new_server_rpc.deadline, finished_tag: finished_tag)
end

#runObject

runs the server

  • if no rpc_descs are registered, this exits immediately, otherwise it continues running permanently and does not return until program exit.

  • #running? returns true after this is called, until #stop cause the the server to stop.



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/grpc/generic/rpc_server.rb', line 201

def run
  if rpc_descs.size == 0
    logger.warn('did not run as no services were present')
    return
  end
  @run_mutex.synchronize do
    @running = true
    @run_cond.signal
  end
  @pool.start
  @server.start
  server_tag = Object.new
  until stopped?
    @server.request_call(server_tag)
    ev = @cq.pluck(server_tag, @poll_period)
    next if ev.nil?
    if ev.type != SERVER_RPC_NEW
      logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
      ev.close
      next
    end
    c = new_active_server_call(ev.call, ev.result)
    unless c.nil?
      mth = ev.result.method.to_sym
      ev.close
      @pool.schedule(c) do |call|
        rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
      end
    end
  end
  @running = false
end

#running?Boolean

determines if the server is currently running

Returns:

  • (Boolean)


130
131
132
# File 'lib/grpc/generic/rpc_server.rb', line 130

def running?
  @running ||= false
end

#stopObject

stops a running server

the call has no impact if the server is already stopped, otherwise server’s current call loop is it’s last.



123
124
125
126
127
# File 'lib/grpc/generic/rpc_server.rb', line 123

def stop
  return unless @running
  @stopped = true
  @pool.stop
end

#stopped?Boolean

determines if the server is currently stopped

Returns:

  • (Boolean)


150
151
152
# File 'lib/grpc/generic/rpc_server.rb', line 150

def stopped?
  @stopped ||= false
end

#wait_till_running(timeout = 0.1) ⇒ Object

Is called from other threads to wait for #run to start up the server.

If run has not been called, this returns immediately.

Parameters:

  • timeout (Numeric) (defaults to: 0.1)

    number of seconds to wait



140
141
142
143
144
145
146
147
# File 'lib/grpc/generic/rpc_server.rb', line 140

def wait_till_running(timeout = 0.1)
  end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100
  while Time.now < end_time
    @run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running?
    sleep(sleep_period)
  end
  running?
end