Class: GrpcKit::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc_kit/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(interceptors: [], shutdown_timeout: 30, min_pool_size: nil, max_pool_size: nil, settings: []) ⇒ Server



13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/grpc_kit/server.rb', line 13

def initialize(interceptors: [], shutdown_timeout: 30, min_pool_size: nil, max_pool_size: nil, settings: [])
  @interceptors = interceptors
  @shutdown_timeout = shutdown_timeout
  @min_pool_size = min_pool_size || GrpcKit::RpcDispatcher::DEFAULT_MIN
  @max_pool_size = max_pool_size || GrpcKit::RpcDispatcher::DEFAULT_MAX
  @sessions = []
  @rpc_descs = {}
  @mutex = Mutex.new
  @stopping = false
  @settings = settings

  GrpcKit.logger.debug("Launched grpc_kit(v#{GrpcKit::VERSION})")
end

Instance Method Details

#force_shutdownvoid

This method returns an undefined value.

This method is expected to be called in trap context



58
59
60
61
62
63
64
65
# File 'lib/grpc_kit/server.rb', line 58

def force_shutdown
  @stopping = true

  Thread.new {
    GrpcKit.logger.debug('force shutdown')
    shutdown_sessions
  }
end

#graceful_shutdown(timeout: true) ⇒ void

This method returns an undefined value.

This method is expected to be called in trap context



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/grpc_kit/server.rb', line 70

def graceful_shutdown(timeout: true)
  @stopping = true

  Thread.new do
    GrpcKit.logger.debug('graceful shutdown')
    @mutex.synchronize { @sessions.each(&:drain) }

    begin
      sec = timeout ? @shutdown_timeout : 0
      Timeout.timeout(sec) do
        sleep 1 until @sessions.empty?
      end
    rescue Timeout::Error => _
      GrpcKit.logger.error("Graceful shutdown is timeout (#{@shutdown_timeout}sec). Perform shutdown forceibly")
      shutdown_sessions
    end
  end
end

#handle(handler) ⇒ void



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/grpc_kit/server.rb', line 29

def handle(handler)
  klass = handler.is_a?(Class) ? handler : handler.class
  unless klass.include?(GrpcKit::Grpc::GenericService)
    raise "#{klass} must include Grpc::GenericService"
  end

  klass.rpc_descs.each do |path, rpc_desc|
    if @rpc_descs[path]
      raise "Duplicated method registered #{path}, class: #{klass}"
    end

    s = handler.is_a?(Class) ? handler.new : handler
    @rpc_descs[path] = rpc_desc.build_server(s, interceptors: @interceptors)
  end
end

#run(conn) ⇒ void



47
48
49
50
51
52
53
54
# File 'lib/grpc_kit/server.rb', line 47

def run(conn)
  raise 'Stopping server' if @stopping

  establish_session(conn) do |s|
    s.submit_settings(@settings)
    s.start
  end
end

#session_countObject



89
90
91
# File 'lib/grpc_kit/server.rb', line 89

def session_count
  @mutex.synchronize { @sessions.size }
end