Class: Gruf::Server

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/gruf/server.rb

Overview

Represents a gRPC server. Automatically loads and augments gRPC handlers and services based on configuration values.

Defined Under Namespace

Classes: ServerAlreadyStartedError

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#logger

Constructor Details

#initialize(opts = {}) ⇒ Server

Initialize the server and load and setup the services

Parameters:

  • opts (Hash) (defaults to: {})


38
39
40
# File 'lib/gruf/server.rb', line 38

def initialize(opts = {})
  init(opts)
end

Instance Attribute Details

#optionsHash (readonly)

Returns Hash of options passed into the server.

Returns:

  • (Hash)

    Hash of options passed into the server



31
32
33
# File 'lib/gruf/server.rb', line 31

def options
  @options
end

#portInteger (readonly)

Returns The port the server is bound to.

Returns:

  • (Integer)

    The port the server is bound to



29
30
31
# File 'lib/gruf/server.rb', line 29

def port
  @port
end

Instance Method Details

#add_interceptor(klass, opts = {}) ⇒ Object

Add an interceptor to the server

Parameters:

  • klass (Class)

    The Interceptor to add to the registry

  • opts (Hash) (defaults to: {})

    A hash of options for the interceptor

Raises:



152
153
154
155
156
# File 'lib/gruf/server.rb', line 152

def add_interceptor(klass, opts = {})
  raise ServerAlreadyStartedError if @started

  @interceptors.use(klass, opts)
end

#add_service(klass) ⇒ Object

Add a gRPC service stub to be served by gruf

Parameters:

  • klass (Class)

Raises:



139
140
141
142
143
# File 'lib/gruf/server.rb', line 139

def add_service(klass)
  raise ServerAlreadyStartedError if @started

  @services << klass unless @services.include?(klass)
end

#clear_interceptorsObject

Clear the interceptor registry of interceptors



207
208
209
210
211
# File 'lib/gruf/server.rb', line 207

def clear_interceptors
  raise ServerAlreadyStartedError if @started

  @interceptors.clear
end

#init(opts = {}) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/gruf/server.rb', line 42

def init(opts = {})
  @options = opts || {}
  @interceptors = opts.fetch(:interceptor_registry, Gruf.interceptors)
  @interceptors = Gruf::Interceptors::Registry.new unless @interceptors.is_a?(Gruf::Interceptors::Registry)
  @services = []
  @started = false
  @stop_server = false
  @stop_server_cv = ConditionVariable.new
  @stop_server_mu = Monitor.new
  @server_mu = Monitor.new
  @hostname = opts.fetch(:hostname, Gruf.server_binding_url)
  @event_listener_proc = opts.fetch(:event_listener_proc, Gruf.event_listener_proc)
  setup
end

#init_restartObject



121
122
123
124
125
# File 'lib/gruf/server.rb', line 121

def init_restart
  logger.info { 'Restarting gruf...' }
  @restart = true
  @stop_server = true
end

#insert_interceptor_after(after_class, interceptor_class, opts = {}) ⇒ Object

Insert an interceptor after another in the currently registered order of execution

Parameters:

  • after_class (Class)

    The interceptor that you want to add the new interceptor after

  • interceptor_class (Class)

    The Interceptor to add to the registry

  • opts (Hash) (defaults to: {})

    A hash of options for the interceptor

Raises:



178
179
180
181
182
# File 'lib/gruf/server.rb', line 178

def insert_interceptor_after(after_class, interceptor_class, opts = {})
  raise ServerAlreadyStartedError if @started

  @interceptors.insert_after(after_class, interceptor_class, opts)
end

#insert_interceptor_before(before_class, interceptor_class, opts = {}) ⇒ Object

Insert an interceptor before another in the currently registered order of execution

Parameters:

  • before_class (Class)

    The interceptor that you want to add the new interceptor before

  • interceptor_class (Class)

    The Interceptor to add to the registry

  • opts (Hash) (defaults to: {})

    A hash of options for the interceptor

Raises:



165
166
167
168
169
# File 'lib/gruf/server.rb', line 165

def insert_interceptor_before(before_class, interceptor_class, opts = {})
  raise ServerAlreadyStartedError if @started

  @interceptors.insert_before(before_class, interceptor_class, opts)
end

#list_interceptorsArray<Class>

Return the current list of added interceptor classes

Returns:

  • (Array<Class>)


189
190
191
# File 'lib/gruf/server.rb', line 189

def list_interceptors
  @interceptors.list
end

#remove_interceptor(klass) ⇒ Object

Remove an interceptor from the server

Parameters:

  • klass (Class)

Raises:



198
199
200
201
202
# File 'lib/gruf/server.rb', line 198

def remove_interceptor(klass)
  raise ServerAlreadyStartedError if @started

  @interceptors.remove(klass)
end

#required_restartObject



127
128
129
# File 'lib/gruf/server.rb', line 127

def required_restart
  @restart
end

#serverGRPC::RpcServer

Returns The GRPC server running.

Returns:

  • (GRPC::RpcServer)

    The GRPC server running



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/gruf/server.rb', line 60

def server
  @server_mu.synchronize do
    @server ||= begin
      # For backward compatibility, we allow these options to be passed directly
      # in the Gruf::Server options, or via Gruf.rpc_server_options.
      server_options = {
        pool_size: options.fetch(:pool_size, Gruf.rpc_server_options[:pool_size]),
        max_waiting_requests: options.fetch(:max_waiting_requests, Gruf.rpc_server_options[:max_waiting_requests]),
        poll_period: options.fetch(:poll_period, Gruf.rpc_server_options[:poll_period]),
        pool_keep_alive: options.fetch(:pool_keep_alive, Gruf.rpc_server_options[:pool_keep_alive]),
        connect_md_proc: options.fetch(:connect_md_proc, Gruf.rpc_server_options[:connect_md_proc]),
        server_args: options.fetch(:server_args, Gruf.rpc_server_options[:server_args])
      }

      server = if @event_listener_proc
                 server_options[:event_listener_proc] = @event_listener_proc
                 Gruf::InstrumentableGrpcServer.new(server_options)
               else
                 GRPC::RpcServer.new(server_options)
               end

      @port = server.add_http2_port(@hostname, ssl_credentials)
      @services.each { |s| server.handle(s) }
      server
    end
  end
end

#start!Object

Start the gRPC server

:nocov:



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/gruf/server.rb', line 92

def start!
  update_proc_title(:starting)

  server_thread = Thread.new do
    logger.info { "Starting gruf server at #{@hostname}..." }
    server.run
  end

  stop_server_thread = Thread.new do
    loop do
      break if @stop_server

      @stop_server_mu.synchronize { @stop_server_cv.wait(@stop_server_mu, 2) }
    end
    logger.info { 'Shutting down...' }
    server.stop
  end

  server.wait_till_running
  @started = true
  update_proc_title(:serving)
  stop_server_thread.join
  server_thread.join
  @started = false

  update_proc_title(:stopped)
  logger.info { 'Goodbye!' }
end