Class: GrpcKit::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc_kit/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(interceptors: [], shutdown_timeout: 30, min_pool_size: nil, max_pool_size: nil) ⇒ Server

Returns a new instance of Server.

Parameters:

  • (defaults to: [])

    list of interceptors

  • (defaults to: 30)

    Number of seconds to wait for the server shutdown

  • (defaults to: nil)

    A mininum thread pool size

  • (defaults to: nil)

    A maximum thread pool size



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/grpc_kit/server.rb', line 13

def initialize(interceptors: [], shutdown_timeout: 30, min_pool_size: nil, max_pool_size: nil)
  @interceptors = interceptors
  @shutdown_timeout = shutdown_timeout
  @min_pool_size = min_pool_size || GrpcKit::RpcDispatcher::DEFAULT_MIN
  @max_pool_size = max_pool_size || GrpcKit::RpcDispatcher::DEFAULT_MAX
  @sessions = []
  @rpc_descs = {}
  @mutex = Mutex.new
  @stopping = false

  GrpcKit.logger.debug("Launched grpc_kit(v#{GrpcKit::VERSION})")
end

Instance Method Details

#force_shutdownvoid

This method returns an undefined value.

This method is expected to be called in trap context



57
58
59
60
61
62
63
64
# File 'lib/grpc_kit/server.rb', line 57

def force_shutdown
  @stopping = true

  Thread.new {
    GrpcKit.logger.debug('force shutdown')
    shutdown_sessions
  }
end

#graceful_shutdownvoid

This method returns an undefined value.

This method is expected to be called in trap context



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/grpc_kit/server.rb', line 68

def graceful_shutdown
  @stopping = true

  Thread.new do
    GrpcKit.logger.debug('graceful shutdown')
    @mutex.synchronize { @sessions.each(&:drain) }

    end_time = Time.now + @shutdown_timeout
    loop do
      if end_time < Time.now
        GrpcKit.logger.error("Graceful shutdown is timeout (#{@shutdown_timeout}sec). Perform shutdown forceibly")
        shutdown_sessions
        break
      elsif @sessions.empty?
        break
      end
    end
  end
end

#handle(handler) ⇒ void

This method returns an undefined value.

Parameters:

  • gRPC handler object or class



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/grpc_kit/server.rb', line 28

def handle(handler)
  klass = handler.is_a?(Class) ? handler : handler.class
  unless klass.include?(GrpcKit::Grpc::GenericService)
    raise "#{klass} must include Grpc::GenericService"
  end

  klass.rpc_descs.each do |path, rpc_desc|
    if @rpc_descs[path]
      raise "Duplicated method registered #{path}, class: #{klass}"
    end

    s = handler.is_a?(Class) ? handler.new : handler
    @rpc_descs[path] = rpc_desc.build_server(s, interceptors: @interceptors)
  end
end

#run(conn) ⇒ void

This method returns an undefined value.

Parameters:



46
47
48
49
50
51
52
53
# File 'lib/grpc_kit/server.rb', line 46

def run(conn)
  raise 'Stopping server' if @stopping

  establish_session(conn) do |s|
    s.submit_settings([])
    s.start
  end
end

#session_countObject



88
89
90
# File 'lib/grpc_kit/server.rb', line 88

def session_count
  @mutex.synchronize { @sessions.size }
end