Class: RDKit::Server

Inherits:
Object show all
Extended by:
ClassMethods
Includes:
Callbacks, MemoryMonitoring
Defined in:
lib/rdkit/server.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

HZ =
10
HANDLED_SIGNALS =
[ :TERM, :INT, :HUP ]

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ClassMethods

instance, register

Methods included from Callbacks

#client_block_resumed, #client_blocked, #client_command_processed, #client_connected, #client_disconnected, #server_started

Methods included from MemoryMonitoring

#used_memory_peak_in_mb, #used_memory_rss_in_mb

Constructor Details

#initialize(host, port) ⇒ Server

Returns a new instance of Server.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/rdkit/server.rb', line 24

def initialize(host, port)
  @host, @port = host, port

  @cycles = 0
  @peak_connected_clients = 0
  @client_id_seq = 0

  @clients = Hash.new
  @blocked_clients = Hash.new
  @monitors = []

  @logger = Logger.new(ENV['RDKIT_LOG_PATH'])
  @current_db = DB.new(0)
  @all_dbs = [@current_db]

  Introspection.register(self)

  @server_up_since = Time.now

  @parser_class = RESPParser

  register_notification_observers!

  Server.register(self)

  # Self-pipe for deferred signal-handling http://www.sitepoint.com/the-self-pipe-trick-explained/
  # Borrowed from `Foreman::Engine`
  reader, writer = create_pipe
  @selfpipe      = { :reader => reader, :writer => writer }
  @signal_queue  = []

  @additional_io_handlers = {}
end

Instance Attribute Details

#coreObject (readonly)

Returns the value of attribute core.



13
14
15
# File 'lib/rdkit/server.rb', line 13

def core
  @core
end

#current_clientObject (readonly)

Returns the value of attribute current_client.



11
12
13
# File 'lib/rdkit/server.rb', line 11

def current_client
  @current_client
end

#current_dbObject (readonly)

Returns the value of attribute current_db.



12
13
14
# File 'lib/rdkit/server.rb', line 12

def current_db
  @current_db
end

#cyclesObject (readonly)

Returns the value of attribute cycles.



17
18
19
# File 'lib/rdkit/server.rb', line 17

def cycles
  @cycles
end

#hostObject (readonly)

Returns the value of attribute host.



14
15
16
# File 'lib/rdkit/server.rb', line 14

def host
  @host
end

#loggerObject (readonly)

Returns the value of attribute logger.



15
16
17
# File 'lib/rdkit/server.rb', line 15

def logger
  @logger
end

#monitorsObject (readonly)

Returns the value of attribute monitors.



16
17
18
# File 'lib/rdkit/server.rb', line 16

def monitors
  @monitors
end

#parser_classObject

Returns the value of attribute parser_class.



18
19
20
# File 'lib/rdkit/server.rb', line 18

def parser_class
  @parser_class
end

#portObject (readonly)

Returns the value of attribute port.



14
15
16
# File 'lib/rdkit/server.rb', line 14

def port
  @port
end

#server_up_sinceObject (readonly)

Returns the value of attribute server_up_since.



10
11
12
# File 'lib/rdkit/server.rb', line 10

def server_up_since
  @server_up_since
end

Instance Method Details

#blocking(on_success = nil, &block) ⇒ Object



205
206
207
208
209
210
# File 'lib/rdkit/server.rb', line 205

def blocking(on_success=nil, &block)
  @blocked_clients[current_client.socket] = current_client
  @clients.delete(current_client.socket)

  current_client.blocking(on_success, &block)
end

#clientsObject



181
182
183
# File 'lib/rdkit/server.rb', line 181

def clients
  @clients.values
end

#create_pipeObject



77
78
79
# File 'lib/rdkit/server.rb', line 77

def create_pipe
  IO.method(:pipe).arity.zero? ? IO.pipe : IO.pipe("BINARY")
end

#delete(socket) ⇒ Object



177
178
179
# File 'lib/rdkit/server.rb', line 177

def delete(socket)
  @clients.delete(socket)
end

#flushall!Object



199
200
201
202
203
# File 'lib/rdkit/server.rb', line 199

def flushall!
  flushdb!

  @all_dbs = [@current_db]
end

#flushdb!Object



195
196
197
# File 'lib/rdkit/server.rb', line 195

def flushdb!
  @current_db.flush!
end

#handle_hangupObject

Handle a HUP signal



138
139
140
141
# File 'lib/rdkit/server.rb', line 138

def handle_hangup
  @logger.warn "SIGHUP received"
  terminate_gracefully
end

#handle_interruptObject

Handle an INT signal



131
132
133
134
# File 'lib/rdkit/server.rb', line 131

def handle_interrupt
  @logger.warn "SIGINT received"
  terminate_gracefully
end

#handle_signal(sig) ⇒ Object

Invoke the real handler for signal sig. This shouldn’t be called directly by signal handlers, as it might invoke code which isn’t re-entrant.

Parameters:

  • sig (Symbol)

    the name of the signal to be handled



109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/rdkit/server.rb', line 109

def handle_signal(sig)
  case sig
  when :TERM
    handle_term_signal
  when :INT
    handle_interrupt
  when :HUP
    handle_hangup
  else
    system "unhandled signal #{sig}"
  end
end

#handle_signalsObject



98
99
100
101
102
# File 'lib/rdkit/server.rb', line 98

def handle_signals
  while sig = @signal_queue.shift
    handle_signal(sig)
  end
end

#handle_term_signalObject

Handle a TERM signal



124
125
126
127
# File 'lib/rdkit/server.rb', line 124

def handle_term_signal
  @logger.warn "SIGTERM received"
  terminate_gracefully
end

#inject_io_handler(another_io, &block) ⇒ Object



58
59
60
# File 'lib/rdkit/server.rb', line 58

def inject_io_handler(another_io, &block)
  @additional_io_handlers[another_io] = block
end

#introspectionObject



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/rdkit/server.rb', line 153

def introspection
  {
    server: {
      ruby_version: "#{RUBY_VERSION}p#{RUBY_PATCHLEVEL}",
      rdkit_version: RDKit::VERSION,
      multiplexing_api: 'select',
      process_id: Process.pid,
      tcp_port: @port,
      uptime_in_seconds: (Time.now - @server_up_since).to_i,
      uptime_in_days: ((Time.now - @server_up_since) / (24 * 60 * 60)).to_i,
      hz: HZ,
    },
    clients: {
      blocked_clients: @blocked_clients.size,
      connected_clients: @clients.size,
      connected_clients_peak: @peak_connected_clients
    },
    memory: {
      used_memory_rss: used_memory_rss_in_mb,
      used_memory_peak: used_memory_peak_in_mb
    },
  }
end

#notice_signalObject



89
90
91
92
93
94
95
96
# File 'lib/rdkit/server.rb', line 89

def notice_signal
  @selfpipe[:writer].write_nonblock('.')
rescue Errno::EAGAIN
  # Ignore writes that would block
rescue Errno::EINT
  # Retry if another signal arrived while writing
  retry
end

#poolObject



212
213
214
# File 'lib/rdkit/server.rb', line 212

def pool
  @pool ||= Thread.pool((ENV['RDKIT_SERVER_THREAD_POOL_SIZE'] || 10).to_i)
end

#register_signal_handlersObject



81
82
83
84
85
86
87
# File 'lib/rdkit/server.rb', line 81

def register_signal_handlers
  HANDLED_SIGNALS.each do |sig|
    if ::Signal.list.include? sig.to_s
      trap(sig) { @signal_queue << sig ; notice_signal }
    end
  end
end

#responderObject



20
21
22
# File 'lib/rdkit/server.rb', line 20

def responder
  @responder ||= (( @runner && $stderr.puts("@runner is deprecated, use @responder instead") ) || @runner)
end

#select_db!(index) ⇒ Object



185
186
187
188
189
190
191
192
193
# File 'lib/rdkit/server.rb', line 185

def select_db!(index)
  if db = @all_dbs.find { |db| db.index == index }
    @current_db = db
  else
    @all_dbs << DB.new(index)

    @current_db = @all_dbs.last
  end
end

#startObject



62
63
64
65
66
67
68
69
70
# File 'lib/rdkit/server.rb', line 62

def start
  sanity_check!

  register_signal_handlers

  @server_socket = TCPServer.new(@host, @port)

  run_acceptor
end

#stopObject



72
73
74
75
# File 'lib/rdkit/server.rb', line 72

def stop
  @logger.warn "shutting down..."
  exit
end

#terminate_gracefullyObject



143
144
145
146
147
148
149
# File 'lib/rdkit/server.rb', line 143

def terminate_gracefully
  return if @terminating

  @terminating = true

  stop
end