Class: Gruf::Server

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/gruf/server.rb

Overview

Represents a gRPC server. Automatically loads and augments gRPC handlers and services based on configuration values.

Defined Under Namespace

Classes: ServerAlreadyStartedError

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#logger

Constructor Details

#initialize(opts = {}) ⇒ Server

Initialize the server and load and setup the services

Parameters:

  • opts (Hash) (defaults to: {})


36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/gruf/server.rb', line 36

def initialize(opts = {})
  @options = opts || {}
  @interceptors = opts.fetch(:interceptor_registry, Gruf.interceptors)
  @interceptors = Gruf::Interceptors::Registry.new unless @interceptors.is_a?(Gruf::Interceptors::Registry)
  @services = []
  @started = false
  @stop_server = false
  @stop_server_cv = ConditionVariable.new
  @stop_server_mu = Monitor.new
  @server_mu = Monitor.new
  @hostname = opts.fetch(:hostname, Gruf.server_binding_url)
  @event_listener_proc = opts.fetch(:event_listener_proc, Gruf.event_listener_proc)
  setup
end

Instance Attribute Details

#optionsHash (readonly)

Returns Hash of options passed into the server.

Returns:

  • (Hash)

    Hash of options passed into the server



29
30
31
# File 'lib/gruf/server.rb', line 29

def options
  @options
end

#portInteger (readonly)

Returns The port the server is bound to.

Returns:

  • (Integer)

    The port the server is bound to



27
28
29
# File 'lib/gruf/server.rb', line 27

def port
  @port
end

Instance Method Details

#add_interceptor(klass, opts = {}) ⇒ Object

Add an interceptor to the server

Parameters:

  • klass (Class)

    The Interceptor to add to the registry

  • opts (Hash) (defaults to: {})

    A hash of options for the interceptor

Raises:



133
134
135
136
# File 'lib/gruf/server.rb', line 133

def add_interceptor(klass, opts = {})
  raise ServerAlreadyStartedError if @started
  @interceptors.use(klass, opts)
end

#add_service(klass) ⇒ Object

Add a gRPC service stub to be served by gruf

Parameters:

  • klass (Class)

Raises:



121
122
123
124
# File 'lib/gruf/server.rb', line 121

def add_service(klass)
  raise ServerAlreadyStartedError if @started
  @services << klass unless @services.include?(klass)
end

#clear_interceptorsObject

Clear the interceptor registry of interceptors



184
185
186
187
# File 'lib/gruf/server.rb', line 184

def clear_interceptors
  raise ServerAlreadyStartedError if @started
  @interceptors.clear
end

#insert_interceptor_after(after_class, interceptor_class, opts = {}) ⇒ Object

Insert an interceptor after another in the currently registered order of execution

Parameters:

  • after_class (Class)

    The interceptor that you want to add the new interceptor after

  • interceptor_class (Class)

    The Interceptor to add to the registry

  • opts (Hash) (defaults to: {})

    A hash of options for the interceptor

Raises:



157
158
159
160
# File 'lib/gruf/server.rb', line 157

def insert_interceptor_after(after_class, interceptor_class, opts = {})
  raise ServerAlreadyStartedError if @started
  @interceptors.insert_after(after_class, interceptor_class, opts)
end

#insert_interceptor_before(before_class, interceptor_class, opts = {}) ⇒ Object

Insert an interceptor before another in the currently registered order of execution

Parameters:

  • before_class (Class)

    The interceptor that you want to add the new interceptor before

  • interceptor_class (Class)

    The Interceptor to add to the registry

  • opts (Hash) (defaults to: {})

    A hash of options for the interceptor

Raises:



145
146
147
148
# File 'lib/gruf/server.rb', line 145

def insert_interceptor_before(before_class, interceptor_class, opts = {})
  raise ServerAlreadyStartedError if @started
  @interceptors.insert_before(before_class, interceptor_class, opts)
end

#list_interceptorsArray<Class>

Return the current list of added interceptor classes

Returns:

  • (Array<Class>)


167
168
169
# File 'lib/gruf/server.rb', line 167

def list_interceptors
  @interceptors.list
end

#remove_interceptor(klass) ⇒ Object

Remove an interceptor from the server

Parameters:

  • klass (Class)

Raises:



176
177
178
179
# File 'lib/gruf/server.rb', line 176

def remove_interceptor(klass)
  raise ServerAlreadyStartedError if @started
  @interceptors.remove(klass)
end

#serverGRPC::RpcServer

Returns The GRPC server running.

Returns:

  • (GRPC::RpcServer)

    The GRPC server running



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/gruf/server.rb', line 54

def server
  @server_mu.synchronize do
    @server ||= begin
      # For backward compatibility, we allow these options to be passed directly
      # in the Gruf::Server options, or via Gruf.rpc_server_options.
      server_options = {
        pool_size: options.fetch(:pool_size, Gruf.rpc_server_options[:pool_size]),
        max_waiting_requests: options.fetch(:max_waiting_requests, Gruf.rpc_server_options[:max_waiting_requests]),
        poll_period: options.fetch(:poll_period, Gruf.rpc_server_options[:poll_period]),
        pool_keep_alive: options.fetch(:pool_keep_alive, Gruf.rpc_server_options[:pool_keep_alive]),
        connect_md_proc: options.fetch(:connect_md_proc, Gruf.rpc_server_options[:connect_md_proc]),
        server_args: options.fetch(:server_args, Gruf.rpc_server_options[:server_args])
      }

      server = if @event_listener_proc
                 server_options[:event_listener_proc] = @event_listener_proc
                 Gruf::InstrumentableGrpcServer.new(server_options)
               else
                 GRPC::RpcServer.new(server_options)
               end

      @port = server.add_http2_port(@hostname, ssl_credentials)
      @services.each { |s| server.handle(s) }
      server
    end
  end
end

#start!Object

Start the gRPC server

:nocov:



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/gruf/server.rb', line 86

def start!
  update_proc_title(:starting)

  server_thread = Thread.new do
    logger.info { "Starting gruf server at #{@hostname}..." }
    server.run
  end

  stop_server_thread = Thread.new do
    loop do
      break if @stop_server
      @stop_server_mu.synchronize { @stop_server_cv.wait(@stop_server_mu, 10) }
    end
    logger.info { 'Shutting down...' }
    server.stop
  end

  server.wait_till_running
  @started = true
  update_proc_title(:serving)
  stop_server_thread.join
  server_thread.join
  @started = false

  update_proc_title(:stopped)
  logger.info { 'Goodbye!' }
end