Class: GrpcKit::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc_kit/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(interceptors: [], shutdown_timeout: 30, min_pool_size: nil, max_pool_size: nil, settings: []) ⇒ Server

Returns a new instance of Server.

Parameters:

  • interceptors (Array<GrpcKit::Grpc::ServerInterceptor>) (defaults to: [])

    list of interceptors

  • shutdown_timeout (Integer) (defaults to: 30)

    Number of seconds to wait for the server shutdown

  • min_pool_size (Integer) (defaults to: nil)

    A mininum thread pool size

  • max_pool_size (Integer) (defaults to: nil)

    A maximum thread pool size



13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/grpc_kit/server.rb', line 13

def initialize(interceptors: [], shutdown_timeout: 30, min_pool_size: nil, max_pool_size: nil, settings: [])
  @interceptors = interceptors
  @shutdown_timeout = shutdown_timeout
  @min_pool_size = min_pool_size || GrpcKit::RpcDispatcher::DEFAULT_MIN
  @max_pool_size = max_pool_size || GrpcKit::RpcDispatcher::DEFAULT_MAX
  @sessions = []
  @rpc_descs = {}
  @mutex = Mutex.new
  @stopping = false
  @settings = settings

  GrpcKit.logger.debug("Launched grpc_kit(v#{GrpcKit::VERSION})")
end

Instance Method Details

#force_shutdownvoid

This method returns an undefined value.

This method is expected to be called in trap context



58
59
60
61
62
63
64
65
# File 'lib/grpc_kit/server.rb', line 58

def force_shutdown
  @stopping = true

  Thread.new {
    GrpcKit.logger.debug('force shutdown')
    shutdown_sessions
  }
end

#graceful_shutdown(timeout: true) ⇒ void

This method returns an undefined value.

This method is expected to be called in trap context



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/grpc_kit/server.rb', line 70

def graceful_shutdown(timeout: true)
  @stopping = true

  Thread.new do
    GrpcKit.logger.debug('graceful shutdown')
    @mutex.synchronize { @sessions.each(&:drain) }

    begin
      sec = timeout ? @shutdown_timeout : 0
      Timeout.timeout(sec) do
        sleep 1 until @sessions.empty?
      end
    rescue Timeout::Error => _
      GrpcKit.logger.error("Graceful shutdown is timeout (#{@shutdown_timeout}sec). Perform shutdown forceibly")
      shutdown_sessions
    end
  end
end

#handle(handler) ⇒ void

This method returns an undefined value.

Parameters:



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/grpc_kit/server.rb', line 29

def handle(handler)
  klass = handler.is_a?(Class) ? handler : handler.class
  unless klass.include?(GrpcKit::Grpc::GenericService)
    raise "#{klass} must include Grpc::GenericService"
  end

  klass.rpc_descs.each do |path, rpc_desc|
    if @rpc_descs[path]
      raise "Duplicated method registered #{path}, class: #{klass}"
    end

    s = handler.is_a?(Class) ? handler.new : handler
    @rpc_descs[path] = rpc_desc.build_server(s, interceptors: @interceptors)
  end
end

#run(conn) ⇒ void

This method returns an undefined value.

Parameters:

  • conn (TCPSocket)


47
48
49
50
51
52
53
54
# File 'lib/grpc_kit/server.rb', line 47

def run(conn)
  raise 'Stopping server' if @stopping

  establish_session(conn) do |s|
    s.submit_settings(@settings)
    s.start
  end
end

#session_countObject



89
90
91
# File 'lib/grpc_kit/server.rb', line 89

def session_count
  @mutex.synchronize { @sessions.size }
end