Class: GrpcKit::Server
- Inherits:
-
Object
- Object
- GrpcKit::Server
- Defined in:
- lib/grpc_kit/server.rb
Instance Method Summary collapse
- #dispatch(path, stream) ⇒ Object
- #force_shutdown ⇒ Object
- #graceful_shutdown ⇒ Object
- #handle(handler) ⇒ Object
-
#initialize(interceptors: []) ⇒ Server
constructor
A new instance of Server.
- #run(conn) ⇒ Object
- #session_count ⇒ Object
Constructor Details
#initialize(interceptors: []) ⇒ Server
Returns a new instance of Server.
8 9 10 11 12 13 14 15 16 17 |
# File 'lib/grpc_kit/server.rb', line 8 def initialize(interceptors: []) @sessions = [] @rpc_descs = {} @interceptors = interceptors @mutex = Mutex.new @stopping = false @max_graceful_wait = 60 GrpcKit.logger.debug("Launched grpc_kit(v#{GrpcKit::VERSION})") end |
Instance Method Details
#dispatch(path, stream) ⇒ Object
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/grpc_kit/server.rb', line 82 def dispatch(path, stream) rpc = @rpc_descs[path] unless rpc e = GrpcKit::Errors::Unimplemented.new(path) stream.send_status(status: e.code, msg: e.) return end stream.invoke(rpc) end |
#force_shutdown ⇒ Object
44 45 46 47 48 49 50 51 52 53 |
# File 'lib/grpc_kit/server.rb', line 44 def force_shutdown # expected to be called in trap context Thread.new do @mutex.synchronize do GrpcKit.logger.debug('force shutdown') @stopping = true @sessions.each(&:shutdown) end end end |
#graceful_shutdown ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/grpc_kit/server.rb', line 55 def graceful_shutdown # expected to be called in trap context Thread.new do GrpcKit.logger.debug('graceful shutdown') @mutex.synchronize { @sessions.each(&:drain) } @stopping = true begin Timeout.timeout(@max_graceful_wait) do loop do break if @sessions.empty? sleep 1 end end rescue Timeout::Error => _ GrpcKit.logger.debug('Max wait time expired') end end end |
#handle(handler) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/grpc_kit/server.rb', line 19 def handle(handler) klass = handler.is_a?(Class) ? handler : handler.class unless klass.include?(GrpcKit::GRPC::GenericService) raise "#{klass} must include GRPC::GenericService" end klass.rpc_descs.each do |path, rpc_desc| if @rpc_descs[path] raise "Duplicated method registered #{path}, class: #{klass}" end s = handler.is_a?(Class) ? handler.new : handler @rpc_descs[path] = rpc_desc.build_server(s, interceptors: @interceptors) end end |
#run(conn) ⇒ Object
35 36 37 38 39 40 41 42 |
# File 'lib/grpc_kit/server.rb', line 35 def run(conn) raise 'Stopping server' if @stopping establish_session(conn) do |s| s.submit_settings([]) s.start end end |
#session_count ⇒ Object
76 77 78 |
# File 'lib/grpc_kit/server.rb', line 76 def session_count @mutex.synchronize { @sessions.size } end |