Class: Cosmos::JsonDRb
Overview
JsonDRb implements the JSON-RPC 2.0 Specification to provide an interface for both internal and external tools to access the COSMOS server. It provides methods to install an access control list to control access to the API. It also limits the available methods to a known list of allowable API methods.
Constant Summary collapse
- MINIMUM_REQUEST_TIME =
0.0001
- FAST_READ =
(RUBY_VERSION > "2.1")
- @@debug =
false
Instance Attribute Summary collapse
-
#acl ⇒ ACL
The access control list.
-
#method_whitelist ⇒ Array<String>
List of methods that should be allowed.
-
#request_count ⇒ Integer
The number of JSON-RPC requests processed.
Class Method Summary collapse
- .debug=(value) ⇒ Object
-
.debug? ⇒ Boolean
Whether debug messages are enabled.
- .get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes, pipe_reader) ⇒ Object
-
.receive_message(socket, data, pipe_reader) ⇒ String
The request message.
- .send_data(socket, data, send_timeout = 10.0) ⇒ Object
Instance Method Summary collapse
-
#add_request_time(request_time) ⇒ Object
Adds a request time to the list.
-
#average_request_time ⇒ Float
The average time in seconds for a JSON DRb request to be processed and the response sent.
-
#graceful_kill ⇒ Object
Gracefully kill the thread.
-
#initialize ⇒ JsonDRb
constructor
A new instance of JsonDRb.
-
#num_clients ⇒ Integer
Returns the number of connected clients.
- #start_service(hostname = nil, port = nil, object = nil) ⇒ Object
-
#stop_service ⇒ Object
Stops the DRb service by closing the socket and the processing thread.
-
#thread ⇒ Thread
The server thread listening for incoming requests.
Constructor Details
#initialize ⇒ JsonDRb
Returns a new instance of JsonDRb.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/cosmos/io/json_drb.rb', line 39 def initialize @listen_socket = nil @thread = nil @acl = nil @object = nil @method_whitelist = nil @request_count = 0 @request_times = [] @request_times_index = 0 @request_mutex = Mutex.new @client_sockets = [] @client_threads = [] @client_pipe_writers = [] @client_mutex = Mutex.new @thread_reader, @thread_writer = IO.pipe end |
Instance Attribute Details
#acl ⇒ ACL
Returns The access control list.
37 38 39 |
# File 'lib/cosmos/io/json_drb.rb', line 37 def acl @acl end |
#method_whitelist ⇒ Array<String>
Returns List of methods that should be allowed.
35 36 37 |
# File 'lib/cosmos/io/json_drb.rb', line 35 def method_whitelist @method_whitelist end |
#request_count ⇒ Integer
Returns The number of JSON-RPC requests processed.
33 34 35 |
# File 'lib/cosmos/io/json_drb.rb', line 33 def request_count @request_count end |
Class Method Details
.debug=(value) ⇒ Object
272 273 274 |
# File 'lib/cosmos/io/json_drb.rb', line 272 def self.debug=(value) @@debug = value end |
.debug? ⇒ Boolean
Returns Whether debug messages are enabled.
267 268 269 |
# File 'lib/cosmos/io/json_drb.rb', line 267 def self.debug? @@debug end |
.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes, pipe_reader) ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/cosmos/io/json_drb.rb', line 220 def self.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes, pipe_reader) while (current_data.length < required_num_bytes) if FAST_READ data = socket.read_nonblock(65535, exception: false) raise EOFError, 'end of file reached' unless data if data == :wait_readable IO.fast_select([socket, pipe_reader], nil, nil, nil) else current_data << data end else begin current_data << socket.read_nonblock(65535) rescue IO::WaitReadable IO.fast_select([socket, pipe_reader], nil, nil, nil) end end end end |
.receive_message(socket, data, pipe_reader) ⇒ String
Returns The request message.
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/cosmos/io/json_drb.rb', line 196 def self.(socket, data, pipe_reader) self.get_at_least_x_bytes_of_data(socket, data, 4, pipe_reader) if data.length >= 4 length = data[0..3].unpack('N'.freeze)[0] data.replace(data[4..-1]) else return nil end self.get_at_least_x_bytes_of_data(socket, data, length, pipe_reader) if data.length >= length = data[0..(length - 1)] data.replace(data[length..-1]) return else return nil end end |
.send_data(socket, data, send_timeout = 10.0) ⇒ Object
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/cosmos/io/json_drb.rb', line 244 def self.send_data(socket, data, send_timeout = 10.0) num_bytes_to_send = data.length + 4 total_bytes_sent = 0 bytes_sent = 0 data_to_send = [data.length].pack('N'.freeze) << data.clone loop do begin bytes_sent = socket.write_nonblock(data_to_send[total_bytes_sent..-1]) rescue Errno::EAGAIN, Errno::EWOULDBLOCK result = IO.fast_select(nil, [socket], nil, send_timeout) if result retry else raise Timeout::Error, "Send Timeout" end end total_bytes_sent += bytes_sent break if total_bytes_sent >= num_bytes_to_send end end |
Instance Method Details
#add_request_time(request_time) ⇒ Object
Adds a request time to the list. A request time consists of the amount of time to receive the request, process it, and send the response. These times are used by the #average_request_time method to calculate an average request time.
172 173 174 175 176 177 178 179 |
# File 'lib/cosmos/io/json_drb.rb', line 172 def add_request_time(request_time) @request_mutex.synchronize do request_time = MINIMUM_REQUEST_TIME if request_time < MINIMUM_REQUEST_TIME @request_times[@request_times_index] = request_time @request_times_index += 1 @request_times_index = 0 if @request_times_index >= 100 end end |
#average_request_time ⇒ Float
Returns The average time in seconds for a JSON DRb request to be processed and the response sent.
183 184 185 186 187 188 189 |
# File 'lib/cosmos/io/json_drb.rb', line 183 def average_request_time avg = 0 @request_mutex.synchronize do avg = @request_times.mean end avg end |
#graceful_kill ⇒ Object
Gracefully kill the thread
93 94 95 |
# File 'lib/cosmos/io/json_drb.rb', line 93 def graceful_kill @thread_writer.write('.') if @thread end |
#num_clients ⇒ Integer
Returns the number of connected clients
58 59 60 |
# File 'lib/cosmos/io/json_drb.rb', line 58 def num_clients @client_threads.length end |
#start_service(hostname = nil, port = nil, object = nil) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/cosmos/io/json_drb.rb', line 102 def start_service(hostname = nil, port = nil, object = nil) if hostname and port and object @thread_reader, @thread_writer = IO.pipe @object = object hostname = '127.0.0.1'.freeze if (hostname.to_s.upcase == 'LOCALHOST'.freeze) # Create a socket to accept connections from clients begin @listen_socket = TCPServer.new(hostname, port) @listen_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) unless Kernel.is_windows? # The address is use error is pretty typical if an existing # CmdTlmServer is running so explicitly rescue this rescue Errno::EADDRINUSE raise "Error binding to port #{port}.\n" + "Either another application is using this port\n" + "or the operating system is being slow cleaning up.\n" + "Make sure all sockets/streams are closed in all applications,\n" + "wait 1 minute and try again." # Something else went wrong which is fatal rescue => error Logger.error "JsonDRb listen thread unable to be created.\n#{error.formatted}" Cosmos.handle_fatal_exception(error) end # Start the listen thread which accepts connections @thread = Thread.new do begin while true begin socket = @listen_socket.accept_nonblock rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR, Errno::EWOULDBLOCK read_ready, _ = IO.select([@listen_socket, @thread_reader]) if read_ready and read_ready.include?(@thread_reader) # Thread should be killed break else retry end end if @acl and !@acl.allow_socket?(socket) Cosmos.close_socket(socket) next end # Create new thread for new connection create_client_thread(socket) end rescue Exception => error Logger.error "JsonDRb listen thread unexpectedly died.\n#{error.formatted}" Cosmos.handle_fatal_exception(error) end end elsif hostname or port or object raise "0 or 3 parameters must be given" else # Client - Noop end end |
#stop_service ⇒ Object
Stops the DRb service by closing the socket and the processing thread
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/cosmos/io/json_drb.rb', line 63 def stop_service Cosmos.kill_thread(self, @thread) @thread = nil Cosmos.close_socket(@listen_socket) @listen_socket = nil client_threads = nil @client_mutex.synchronize do @client_sockets.each do |client_socket| Cosmos.close_socket(client_socket) end @client_pipe_writers.each do |client_pipe_writer| client_pipe_writer.write('.') end client_threads = @client_threads.clone end # This cannot be inside of the client_mutex or the threads will not # be able to shutdown because they will stick on the client_mutex client_threads.each do |client_thread| Cosmos.kill_thread(self, client_thread) end @client_mutex.synchronize do @client_threads.clear @client_sockets.clear @client_pipe_writers.clear end end |
#thread ⇒ Thread
Returns The server thread listening for incoming requests.
162 163 164 |
# File 'lib/cosmos/io/json_drb.rb', line 162 def thread @thread end |