Class: OpenC3::JsonDRb
Overview
JsonDRb implements the JSON-RPC 2.0 Specification to provide an interface for both internal and external tools to access the OpenC3 server. It provides methods to install an access control list to control access to the API. It also limits the available methods to a known list of allowable API methods.
Constant Summary collapse
- MINIMUM_REQUEST_TIME =
Minimum amount of time in seconds to receive the JSON request, process it, and send the response. Requests for less than this amount will be set to the minimum
0.0001
- STOP_SERVICE_TIMEOUT =
seconds to wait when stopping the service
10.0
- PUMA_THREAD_TIMEOUT =
seconds to wait for the puma threads to die
10.0
- SERVER_START_TIMEOUT =
seconds to wait for the server to start
15.0
- @@debug =
false
Instance Attribute Summary collapse
-
#method_whitelist ⇒ Array<String>
List of methods that should be allowed.
-
#object ⇒ ACL
attr_accessor :acl.
-
#request_count ⇒ Integer
The number of JSON-RPC requests processed.
-
#thread ⇒ Thread
readonly
The server thread listening for incoming requests.
Class Method Summary collapse
- .debug=(value) ⇒ Object
-
.debug? ⇒ Boolean
Whether debug messages are enabled.
Instance Method Summary collapse
-
#add_request_time(request_time) ⇒ Object
Adds a request time to the list.
-
#average_request_time ⇒ Float
The average time in seconds for a JSON DRb request to be processed and the response sent.
-
#graceful_kill ⇒ Object
Gracefully kill the thread.
-
#initialize ⇒ JsonDRb
constructor
A new instance of JsonDRb.
-
#num_clients ⇒ Integer
Returns the number of connected clients.
-
#process_request(request_data:, request_headers:, start_time:) ⇒ Object
Process the JSON request data, execute the method, and create a response.
- #start_service(hostname = nil, port = nil, object = nil, max_threads = 1000) ⇒ Object
-
#stop_service ⇒ Object
Stops the DRb service by closing the socket and the processing thread.
Constructor Details
#initialize ⇒ JsonDRb
Returns a new instance of JsonDRb.
79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/openc3/io/json_drb.rb', line 79 def initialize @thread = nil # @acl = nil @object = nil @method_whitelist = nil @request_count = 0 @request_times = [] @request_times_index = 0 @request_mutex = Mutex.new @server = nil @server_mutex = Mutex.new end |
Instance Attribute Details
#method_whitelist ⇒ Array<String>
Returns List of methods that should be allowed.
73 74 75 |
# File 'lib/openc3/io/json_drb.rb', line 73 def method_whitelist @method_whitelist end |
#object ⇒ ACL
attr_accessor :acl
77 78 79 |
# File 'lib/openc3/io/json_drb.rb', line 77 def object @object end |
#request_count ⇒ Integer
Returns The number of JSON-RPC requests processed.
71 72 73 |
# File 'lib/openc3/io/json_drb.rb', line 71 def request_count @request_count end |
#thread ⇒ Thread (readonly)
Returns The server thread listening for incoming requests.
212 213 214 |
# File 'lib/openc3/io/json_drb.rb', line 212 def thread @thread end |
Class Method Details
.debug=(value) ⇒ Object
245 246 247 |
# File 'lib/openc3/io/json_drb.rb', line 245 def self.debug=(value) @@debug = value end |
.debug? ⇒ Boolean
Returns Whether debug messages are enabled.
240 241 242 |
# File 'lib/openc3/io/json_drb.rb', line 240 def self.debug? @@debug end |
Instance Method Details
#add_request_time(request_time) ⇒ Object
Adds a request time to the list. A request time consists of the amount of time to receive the request, process it, and send the response. These times are used by the #average_request_time method to calculate an average request time.
220 221 222 223 224 225 226 227 |
# File 'lib/openc3/io/json_drb.rb', line 220 def add_request_time(request_time) @request_mutex.synchronize do request_time = MINIMUM_REQUEST_TIME if request_time < MINIMUM_REQUEST_TIME @request_times[@request_times_index] = request_time @request_times_index += 1 @request_times_index = 0 if @request_times_index >= 100 end end |
#average_request_time ⇒ Float
Returns The average time in seconds for a JSON DRb request to be processed and the response sent.
231 232 233 234 235 236 237 |
# File 'lib/openc3/io/json_drb.rb', line 231 def average_request_time avg = 0 @request_mutex.synchronize do avg = @request_times.mean end avg end |
#graceful_kill ⇒ Object
Gracefully kill the thread
121 122 123 124 125 126 |
# File 'lib/openc3/io/json_drb.rb', line 121 def graceful_kill @server_mutex.synchronize do @server.stop if @server and @server.running rescue end end |
#num_clients ⇒ Integer
Returns the number of connected clients
94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/openc3/io/json_drb.rb', line 94 def num_clients clients = 0 @server_mutex.synchronize do if @server # @server.stats() returns a string like: { "backlog": 0, "running": 0 } # "running" indicates the number of server threads running, and # therefore the number of clients connected. stats = @server.stats() stats =~ /"running": \d*/ clients = $&.split(":")[1].to_i end end return clients end |
#process_request(request_data:, request_headers:, start_time:) ⇒ Object
Process the JSON request data, execute the method, and create a response.
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/openc3/io/json_drb.rb', line 256 def process_request(request_data:, request_headers:, start_time:) @request_count += 1 begin request = JsonRpcRequest.from_json(request_data, request_headers) response = nil error_code = nil response_data = nil if (@method_whitelist and @method_whitelist.include?(request.method.downcase())) or (!@method_whitelist and !JsonRpcRequest::DANGEROUS_METHODS.include?(request.method.downcase())) begin if request.keyword_params result = @object.public_send(request.method.downcase().intern, *request.params, **request.keyword_params) else result = @object.public_send(request.method.downcase().intern, *request.params) end if request.id response = JsonRpcSuccessResponse.new(result, request.id) end rescue Exception => error # Filter out the framework stack trace (rails, rack, puma etc) lines = error.formatted.split("\n") i = lines.find_index { |row| row.include?('actionpack') || row.include?('activesupport') } Logger.error lines[0...i].join("\n") if request.id if NoMethodError === error error_code = JsonRpcError::ErrorCode::METHOD_NOT_FOUND response = JsonRpcErrorResponse.new( JsonRpcError.new(error_code, "Method not found", error), request.id ) elsif ArgumentError === error error_code = JsonRpcError::ErrorCode::INVALID_PARAMS response = JsonRpcErrorResponse.new( JsonRpcError.new(error_code, "Invalid params", error), request.id ) elsif AuthError === error error_code = JsonRpcError::ErrorCode::AUTH_ERROR response = JsonRpcErrorResponse.new( JsonRpcError.new(error_code, error., error), request.id ) elsif ForbiddenError === error error_code = JsonRpcError::ErrorCode::FORBIDDEN_ERROR response = JsonRpcErrorResponse.new( JsonRpcError.new(error_code, error., error), request.id ) elsif HazardousError === error error_code = JsonRpcError::ErrorCode::HAZARDOUS_ERROR response = JsonRpcErrorResponse.new( JsonRpcError.new(error_code, error., error), request.id ) else error_code = JsonRpcError::ErrorCode::OTHER_ERROR response = JsonRpcErrorResponse.new( JsonRpcError.new(error_code, error., error), request.id ) end end end else if request.id error_code = JsonRpcError::ErrorCode::OTHER_ERROR response = JsonRpcErrorResponse.new( JsonRpcError.new(error_code, "Cannot call unauthorized methods"), request.id ) end end response_data = process_response(response, start_time) if response return response_data, error_code rescue => error error_code = JsonRpcError::ErrorCode::INVALID_REQUEST response = JsonRpcErrorResponse.new(JsonRpcError.new(error_code, "Invalid Request", error), nil) response_data = process_response(response, start_time) return response_data, error_code end end |
#start_service(hostname = nil, port = nil, object = nil, max_threads = 1000) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/openc3/io/json_drb.rb', line 131 def start_service(hostname = nil, port = nil, object = nil, max_threads = 1000) server_started = false @server_mutex.synchronize do server_started = true if @server end return if server_started if hostname and port and object @object = object hostname = '127.0.0.1'.freeze if hostname.to_s.upcase == 'LOCALHOST'.freeze @thread = Thread.new do # Create an http server to accept requests from clients server_config = { :Host => hostname, :Port => port, :Silent => true, :Verbose => false, :Threads => "0:#{max_threads}", } # The run call will block until the server is stopped. Rackup::Handler::Puma.run(JsonDrbRack.new(self), server_config) do |server| @server_mutex.synchronize do @server = server end end # Wait for all puma threads to stop before trying to close # the sockets start_time = Time.now while true puma_threads = false Thread.list.each { |thread| puma_threads = true if thread.inspect.match?(/puma/) } break if !puma_threads break if (Time.now - start_time) > PUMA_THREAD_TIMEOUT sleep 0.25 end # Puma doesn't clean up it's own sockets after shutting down, # so we'll do that here. @server_mutex.synchronize do @server.binder.close() if @server end # The address in use error is pretty typical if an existing # server is running so explicitly rescue this rescue Errno::EADDRINUSE @server = nil raise "Error binding to port #{port}.\n" + "Either another application is using this port\n" + "or the operating system is being slow cleaning up.\n" + "Make sure all sockets/streams are closed in all applications,\n" + "wait 1 minute and try again." # Something else went wrong which is fatal rescue => error @server = nil Logger.error "JsonDRb http server could not be started or unexpectedly died.\n#{error.formatted}" OpenC3.handle_fatal_exception(error) end # Wait for the server to be started in the thread before returning. start_time = Time.now while ((Time.now - start_time) < SERVER_START_TIMEOUT) and !server_started sleep(0.1) @server_mutex.synchronize do server_started = true if @server and @server.running end end raise "JsonDRb http server could not be started." unless server_started elsif hostname or port or object raise "0 or 3 parameters must be given" else # Client - Noop end end |
#stop_service ⇒ Object
Stops the DRb service by closing the socket and the processing thread
110 111 112 113 114 115 116 117 118 |
# File 'lib/openc3/io/json_drb.rb', line 110 def stop_service # Kill the server thread # parameters are owner, thread, graceful_timeout, timeout_interval, hard_timeout OpenC3.kill_thread(self, @thread, STOP_SERVICE_TIMEOUT, 0.1, STOP_SERVICE_TIMEOUT) @thread = nil @server_mutex.synchronize do @server = nil end end |