Class: RDKit::Server

Inherits:
Object show all
Extended by:
ClassMethods
Includes:
Callbacks, MemoryMonitoring
Defined in:
lib/rdkit/server.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

HZ =
10
HANDLED_SIGNALS =
[ :TERM, :INT, :HUP ]

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ClassMethods

instance, register

Methods included from Callbacks

#client_block_resumed, #client_blocked, #client_command_processed, #client_connected, #client_disconnected, #server_started

Methods included from MemoryMonitoring

#used_memory_peak_in_mb, #used_memory_rss_in_mb

Constructor Details

#initialize(host, port) ⇒ Server

Returns a new instance of Server.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/rdkit/server.rb', line 24

def initialize(host, port)
  @host, @port = host, port

  @cycles = 0
  @peak_connected_clients = 0
  @client_id_seq = 0

  @clients = Hash.new
  @blocked_clients = Hash.new
  @monitors = []

  @logger = Logger.new(ENV['RDKIT_LOG_PATH'])
  @current_db = DB.new(0)
  @all_dbs = [@current_db]

  Introspection.register(self)

  @server_up_since = Time.now

  @parser_class = RESPParser

  register_notification_observers!

  Server.register(self)

  # Self-pipe for deferred signal-handling http://www.sitepoint.com/the-self-pipe-trick-explained/
  # Borrowed from `Foreman::Engine`
  reader, writer = create_pipe
  @selfpipe      = { :reader => reader, :writer => writer }
  @signal_queue  = []
end

Instance Attribute Details

#coreObject (readonly)

Returns the value of attribute core.



13
14
15
# File 'lib/rdkit/server.rb', line 13

def core
  @core
end

#current_clientObject (readonly)

Returns the value of attribute current_client.



11
12
13
# File 'lib/rdkit/server.rb', line 11

def current_client
  @current_client
end

#current_dbObject (readonly)

Returns the value of attribute current_db.



12
13
14
# File 'lib/rdkit/server.rb', line 12

def current_db
  @current_db
end

#cyclesObject (readonly)

Returns the value of attribute cycles.



17
18
19
# File 'lib/rdkit/server.rb', line 17

def cycles
  @cycles
end

#hostObject (readonly)

Returns the value of attribute host.



14
15
16
# File 'lib/rdkit/server.rb', line 14

def host
  @host
end

#loggerObject (readonly)

Returns the value of attribute logger.



15
16
17
# File 'lib/rdkit/server.rb', line 15

def logger
  @logger
end

#monitorsObject (readonly)

Returns the value of attribute monitors.



16
17
18
# File 'lib/rdkit/server.rb', line 16

def monitors
  @monitors
end

#parser_classObject

Returns the value of attribute parser_class.



18
19
20
# File 'lib/rdkit/server.rb', line 18

def parser_class
  @parser_class
end

#portObject (readonly)

Returns the value of attribute port.



14
15
16
# File 'lib/rdkit/server.rb', line 14

def port
  @port
end

#server_up_sinceObject (readonly)

Returns the value of attribute server_up_since.



10
11
12
# File 'lib/rdkit/server.rb', line 10

def server_up_since
  @server_up_since
end

Instance Method Details

#blocking(on_success = nil, &block) ⇒ Object



199
200
201
202
203
204
# File 'lib/rdkit/server.rb', line 199

def blocking(on_success=nil, &block)
  @blocked_clients[current_client.socket] = current_client
  @clients.delete(current_client.socket)

  current_client.blocking(on_success, &block)
end

#clientsObject



175
176
177
# File 'lib/rdkit/server.rb', line 175

def clients
  @clients.values
end

#create_pipeObject



71
72
73
# File 'lib/rdkit/server.rb', line 71

def create_pipe
  IO.method(:pipe).arity.zero? ? IO.pipe : IO.pipe("BINARY")
end

#delete(socket) ⇒ Object



171
172
173
# File 'lib/rdkit/server.rb', line 171

def delete(socket)
  @clients.delete(socket)
end

#flushall!Object



193
194
195
196
197
# File 'lib/rdkit/server.rb', line 193

def flushall!
  flushdb!

  @all_dbs = [@current_db]
end

#flushdb!Object



189
190
191
# File 'lib/rdkit/server.rb', line 189

def flushdb!
  @current_db.flush!
end

#handle_hangupObject

Handle a HUP signal



132
133
134
135
# File 'lib/rdkit/server.rb', line 132

def handle_hangup
  @logger.warn "SIGHUP received"
  terminate_gracefully
end

#handle_interruptObject

Handle an INT signal



125
126
127
128
# File 'lib/rdkit/server.rb', line 125

def handle_interrupt
  @logger.warn "SIGINT received"
  terminate_gracefully
end

#handle_signal(sig) ⇒ Object

Invoke the real handler for signal sig. This shouldn’t be called directly by signal handlers, as it might invoke code which isn’t re-entrant.

Parameters:

  • sig (Symbol)

    the name of the signal to be handled



103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/rdkit/server.rb', line 103

def handle_signal(sig)
  case sig
  when :TERM
    handle_term_signal
  when :INT
    handle_interrupt
  when :HUP
    handle_hangup
  else
    system "unhandled signal #{sig}"
  end
end

#handle_signalsObject



92
93
94
95
96
# File 'lib/rdkit/server.rb', line 92

def handle_signals
  while sig = @signal_queue.shift
    handle_signal(sig)
  end
end

#handle_term_signalObject

Handle a TERM signal



118
119
120
121
# File 'lib/rdkit/server.rb', line 118

def handle_term_signal
  @logger.warn "SIGTERM received"
  terminate_gracefully
end

#introspectionObject



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/rdkit/server.rb', line 147

def introspection
  {
    server: {
      ruby_version: "#{RUBY_VERSION}p#{RUBY_PATCHLEVEL}",
      rdkit_version: RDKit::VERSION,
      multiplexing_api: 'select',
      process_id: Process.pid,
      tcp_port: @port,
      uptime_in_seconds: (Time.now - @server_up_since).to_i,
      uptime_in_days: ((Time.now - @server_up_since) / (24 * 60 * 60)).to_i,
      hz: HZ,
    },
    clients: {
      blocked_clients: @blocked_clients.size,
      connected_clients: @clients.size,
      connected_clients_peak: @peak_connected_clients
    },
    memory: {
      used_memory_rss: used_memory_rss_in_mb,
      used_memory_peak: used_memory_peak_in_mb
    },
  }
end

#notice_signalObject



83
84
85
86
87
88
89
90
# File 'lib/rdkit/server.rb', line 83

def notice_signal
  @selfpipe[:writer].write_nonblock('.')
rescue Errno::EAGAIN
  # Ignore writes that would block
rescue Errno::EINT
  # Retry if another signal arrived while writing
  retry
end

#poolObject



206
207
208
# File 'lib/rdkit/server.rb', line 206

def pool
  @pool ||= Thread.pool((ENV['RDKIT_SERVER_THREAD_POOL_SIZE'] || 10).to_i)
end

#register_signal_handlersObject



75
76
77
78
79
80
81
# File 'lib/rdkit/server.rb', line 75

def register_signal_handlers
  HANDLED_SIGNALS.each do |sig|
    if ::Signal.list.include? sig.to_s
      trap(sig) { @signal_queue << sig ; notice_signal }
    end
  end
end

#responderObject



20
21
22
# File 'lib/rdkit/server.rb', line 20

def responder
  @responder ||= (( @runner && $stderr.puts("@runner is deprecated, use @responder instead") ) || @runner)
end

#select_db!(index) ⇒ Object



179
180
181
182
183
184
185
186
187
# File 'lib/rdkit/server.rb', line 179

def select_db!(index)
  if db = @all_dbs.find { |db| db.index == index }
    @current_db = db
  else
    @all_dbs << DB.new(index)

    @current_db = @all_dbs.last
  end
end

#startObject



56
57
58
59
60
61
62
63
64
# File 'lib/rdkit/server.rb', line 56

def start
  sanity_check!

  register_signal_handlers

  @server_socket = TCPServer.new(@host, @port)

  run_acceptor
end

#stopObject



66
67
68
69
# File 'lib/rdkit/server.rb', line 66

def stop
  @logger.warn "shutting down..."
  exit
end

#terminate_gracefullyObject



137
138
139
140
141
142
143
# File 'lib/rdkit/server.rb', line 137

def terminate_gracefully
  return if @terminating

  @terminating = true

  stop
end