Class: GrpcKit::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc_kit/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(interceptors: []) ⇒ Server

Returns a new instance of Server

Parameters:



9
10
11
12
13
14
15
16
17
18
# File 'lib/grpc_kit/server.rb', line 9

def initialize(interceptors: [])
  @sessions = []
  @rpc_descs = {}
  @interceptors = interceptors
  @mutex = Mutex.new
  @stopping = false
  @max_graceful_wait = 60

  GrpcKit.logger.debug("Launched grpc_kit(v#{GrpcKit::VERSION})")
end

Instance Method Details

#dispatch(path, stream) ⇒ void

This method returns an undefined value.

Parameters:

  • path (String)

    gRPC method path

  • stream (GrpcKit::Streams::ServerStream)


90
91
92
93
94
95
96
97
98
99
# File 'lib/grpc_kit/server.rb', line 90

def dispatch(path, stream)
  rpc = @rpc_descs[path]
  unless rpc
    e = GrpcKit::Errors::Unimplemented.new(path)
    stream.send_status(status: e.code, msg: e.message)
    return
  end

  stream.invoke(rpc)
end

#force_shutdownvoid

This method returns an undefined value.

This method is expected to be called in trap context



51
52
53
54
55
56
57
58
59
# File 'lib/grpc_kit/server.rb', line 51

def force_shutdown
  Thread.new do
    @mutex.synchronize do
      GrpcKit.logger.debug('force shutdown')
      @stopping = true
      @sessions.each(&:shutdown)
    end
  end
end

#graceful_shutdownvoid

This method returns an undefined value.

This method is expected to be called in trap context



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/grpc_kit/server.rb', line 63

def graceful_shutdown
  Thread.new do
    GrpcKit.logger.debug('graceful shutdown')
    @mutex.synchronize { @sessions.each(&:drain) }
    @stopping = true

    begin
      Timeout.timeout(@max_graceful_wait) do
        loop do
          break if @sessions.empty?

          sleep 1
        end
      end
    rescue Timeout::Error => _
      GrpcKit.logger.debug('Max wait time expired')
    end
  end
end

#handle(handler) ⇒ void

This method returns an undefined value.

Parameters:



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/grpc_kit/server.rb', line 22

def handle(handler)
  klass = handler.is_a?(Class) ? handler : handler.class
  unless klass.include?(GrpcKit::GRPC::GenericService)
    raise "#{klass} must include GRPC::GenericService"
  end

  klass.rpc_descs.each do |path, rpc_desc|
    if @rpc_descs[path]
      raise "Duplicated method registered #{path}, class: #{klass}"
    end

    s = handler.is_a?(Class) ? handler.new : handler
    @rpc_descs[path] = rpc_desc.build_server(s, interceptors: @interceptors)
  end
end

#run(conn) ⇒ void

This method returns an undefined value.

Parameters:

  • conn (TCPSocket)


40
41
42
43
44
45
46
47
# File 'lib/grpc_kit/server.rb', line 40

def run(conn)
  raise 'Stopping server' if @stopping

  establish_session(conn) do |s|
    s.submit_settings([])
    s.start
  end
end

#session_countObject



83
84
85
# File 'lib/grpc_kit/server.rb', line 83

def session_count
  @mutex.synchronize { @sessions.size }
end