Class: GrpcKit::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc_kit/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(interceptors: []) ⇒ Server

Returns a new instance of Server.



8
9
10
11
12
13
14
15
16
17
# File 'lib/grpc_kit/server.rb', line 8

def initialize(interceptors: [])
  @sessions = []
  @rpc_descs = {}
  @interceptors = interceptors
  @mutex = Mutex.new
  @stopping = false
  @max_graceful_wait = 60

  GrpcKit.logger.debug("Launched grpc_kit(v#{GrpcKit::VERSION})")
end

Instance Method Details

#dispatch(path, stream) ⇒ Object



82
83
84
85
86
87
88
89
90
91
# File 'lib/grpc_kit/server.rb', line 82

def dispatch(path, stream)
  rpc = @rpc_descs[path]
  unless rpc
    e = GrpcKit::Errors::Unimplemented.new(path)
    stream.send_status(status: e.code, msg: e.message)
    return
  end

  stream.invoke(rpc)
end

#force_shutdownObject



44
45
46
47
48
49
50
51
52
53
# File 'lib/grpc_kit/server.rb', line 44

def force_shutdown
  # expected to be called in trap context
  Thread.new do
    @mutex.synchronize do
      GrpcKit.logger.debug('force shutdown')
      @stopping = true
      @sessions.each(&:shutdown)
    end
  end
end

#graceful_shutdownObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/grpc_kit/server.rb', line 55

def graceful_shutdown
  # expected to be called in trap context
  Thread.new do
    GrpcKit.logger.debug('graceful shutdown')
    @mutex.synchronize { @sessions.each(&:drain) }
    @stopping = true

    begin
      Timeout.timeout(@max_graceful_wait) do
        loop do
          break if @sessions.empty?

          sleep 1
        end
      end
    rescue Timeout::Error => _
      GrpcKit.logger.debug('Max wait time expired')
    end
  end
end

#handle(handler) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/grpc_kit/server.rb', line 19

def handle(handler)
  klass = handler.is_a?(Class) ? handler : handler.class
  unless klass.include?(GrpcKit::GRPC::GenericService)
    raise "#{klass} must include GRPC::GenericService"
  end

  klass.rpc_descs.each do |path, rpc_desc|
    if @rpc_descs[path]
      raise "Duplicated method registered #{path}, class: #{klass}"
    end

    s = handler.is_a?(Class) ? handler.new : handler
    @rpc_descs[path] = rpc_desc.build_server(s, interceptors: @interceptors)
  end
end

#run(conn) ⇒ Object



35
36
37
38
39
40
41
42
# File 'lib/grpc_kit/server.rb', line 35

def run(conn)
  raise 'Stopping server' if @stopping

  establish_session(conn) do |s|
    s.submit_settings([])
    s.start
  end
end

#session_countObject



76
77
78
# File 'lib/grpc_kit/server.rb', line 76

def session_count
  @mutex.synchronize { @sessions.size }
end