Class: Cosmos::JsonDRb

Inherits:
Object show all
Defined in:
lib/cosmos/io/json_drb.rb

Overview

JsonDRb implements the JSON-RPC 2.0 Specification to provide an interface for both internal and external tools to access the COSMOS server. It provides methods to install an access control list to control access to the API. It also limits the available methods to a known list of allowable API methods.

Constant Summary collapse

MINIMUM_REQUEST_TIME =

Minimum amount of time in seconds to receive the JSON request, process it, and send the response. Requests for less than this amount will be set to the minimum

0.0001
STOP_SERVICE_TIMEOUT =

seconds to wait when stopping the service

10.0
PUMA_THREAD_TIMEOUT =

seconds to wait for the puma threads to die

10.0
SERVER_START_TIMEOUT =

seconds to wait for the server to start

15.0
@@debug =
false

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeJsonDRb

Returns a new instance of JsonDRb.



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/cosmos/io/json_drb.rb', line 70

def initialize
  @thread = nil
  @acl = nil
  @object = nil
  @method_whitelist = nil
  @request_count = 0
  @request_times = []
  @request_times_index = 0
  @request_mutex = Mutex.new
  @server = nil
  @server_mutex = Mutex.new
end

Instance Attribute Details

#aclACL

Returns The access control list.

Returns:

  • (ACL)

    The access control list



68
69
70
# File 'lib/cosmos/io/json_drb.rb', line 68

def acl
  @acl
end

#method_whitelistArray<String>

Returns List of methods that should be allowed.

Returns:

  • (Array<String>)

    List of methods that should be allowed



66
67
68
# File 'lib/cosmos/io/json_drb.rb', line 66

def method_whitelist
  @method_whitelist
end

#request_countInteger

Returns The number of JSON-RPC requests processed.

Returns:

  • (Integer)

    The number of JSON-RPC requests processed



64
65
66
# File 'lib/cosmos/io/json_drb.rb', line 64

def request_count
  @request_count
end

Class Method Details

.debug=(value) ⇒ Object

Parameters:

  • value (Boolean)

    Whether to enable debug messages



243
244
245
# File 'lib/cosmos/io/json_drb.rb', line 243

def self.debug=(value)
  @@debug = value
end

.debug?Boolean

Returns Whether debug messages are enabled.

Returns:

  • (Boolean)

    Whether debug messages are enabled



238
239
240
# File 'lib/cosmos/io/json_drb.rb', line 238

def self.debug?
  @@debug
end

Instance Method Details

#add_request_time(request_time) ⇒ Object

Adds a request time to the list. A request time consists of the amount of time to receive the request, process it, and send the response. These times are used by the #average_request_time method to calculate an average request time.

Parameters:

  • request_time (Float)

    Time in seconds for the data transmission



218
219
220
221
222
223
224
225
# File 'lib/cosmos/io/json_drb.rb', line 218

def add_request_time(request_time)
  @request_mutex.synchronize do
    request_time = MINIMUM_REQUEST_TIME if request_time < MINIMUM_REQUEST_TIME
    @request_times[@request_times_index] = request_time
    @request_times_index += 1
    @request_times_index = 0 if @request_times_index >= 100
  end
end

#average_request_timeFloat

Returns The average time in seconds for a JSON DRb request to be processed and the response sent.

Returns:

  • (Float)

    The average time in seconds for a JSON DRb request to be processed and the response sent.



229
230
231
232
233
234
235
# File 'lib/cosmos/io/json_drb.rb', line 229

def average_request_time
  avg = 0
  @request_mutex.synchronize do
    avg = @request_times.mean
  end
  avg
end

#graceful_killObject

Gracefully kill the thread



112
113
114
115
116
117
118
119
# File 'lib/cosmos/io/json_drb.rb', line 112

def graceful_kill
  @server_mutex.synchronize do
    begin
      @server.stop if @server and @server.running
    rescue
    end
  end
end

#num_clientsInteger

Returns the number of connected clients

Returns:

  • (Integer)

    The number of connected clients



85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/cosmos/io/json_drb.rb', line 85

def num_clients
  clients = 0
  @server_mutex.synchronize do
    if @server
      # @server.stats() returns a string like: { "backlog": 0, "running": 0 }
      # "running" indicates the number of server threads running, and
      # therefore the number of clients connected.
      stats = @server.stats()
      stats =~ /"running": \d*/
      clients = $&.split(":")[1].to_i
    end
  end
  return clients
end

#process_request(request_data, start_time) ⇒ Object

Process the JSON request data, execute the method, and create a response.

Parameters:

  • request_data (String)

    The JSON encoded request

  • start_time (Time)

    The time when the initial request was received

Returns:

  • response_data, error_code [String, Integer/nil] The JSON encoded response and error code



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/cosmos/io/json_drb.rb', line 253

def process_request(request_data, start_time)
  @request_count += 1
  STDOUT.puts request_data if JsonDRb.debug?
  begin
    request = JsonRpcRequest.from_json(request_data)
    response = nil
    error_code = nil
    response_data = nil

    if (@method_whitelist and @method_whitelist.include?(request.method.downcase())) or
       (!@method_whitelist and !JsonRpcRequest::DANGEROUS_METHODS.include?(request.method.downcase()))
      begin
        result = @object.send(request.method.downcase().intern, *request.params)
        if request.id
          response = JsonRpcSuccessResponse.new(result, request.id)
        end
      rescue Exception => error
        if request.id
          if NoMethodError === error
            error_code = JsonRpcError::ErrorCode::METHOD_NOT_FOUND
            response = JsonRpcErrorResponse.new(
              JsonRpcError.new(error_code, "Method not found", error), request.id)
          elsif ArgumentError === error
            error_code = JsonRpcError::ErrorCode::INVALID_PARAMS
            response = JsonRpcErrorResponse.new(
              JsonRpcError.new(error_code, "Invalid params", error), request.id)
          else
            error_code = JsonRpcError::ErrorCode::OTHER_ERROR
            response = JsonRpcErrorResponse.new(
              JsonRpcError.new(error_code, error.message, error), request.id)
          end
        end
      end
    else
      if request.id
        error_code = JsonRpcError::ErrorCode::OTHER_ERROR
        response = JsonRpcErrorResponse.new(
          JsonRpcError.new(error_code, "Cannot call unauthorized methods"), request.id)
      end
    end
    response_data = process_response(response, start_time) if response
    return response_data, error_code
  rescue => error
    error_code = JsonRpcError::ErrorCode::INVALID_REQUEST
    response = JsonRpcErrorResponse.new(JsonRpcError.new(error_code, "Invalid Request", error), nil)
    response_data = process_response(response, start_time)
    return response_data, error_code
  end
end

#start_service(hostname = nil, port = nil, object = nil, max_threads = 1000, system = nil) ⇒ Object

Parameters:

  • hostname (String) (defaults to: nil)

    The host to start the service on

  • port (Integer) (defaults to: nil)

    The port number to listen for connections

  • object (Object) (defaults to: nil)

    The object to send the DRb requests to. This object must either include the Cosmos::Script module or be the CmdTlmServer.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/cosmos/io/json_drb.rb', line 126

def start_service(hostname = nil, port = nil, object = nil, max_threads = 1000, system = nil)
  server_started = false
  @server_mutex.synchronize do
    server_started = true if @server
  end
  return if server_started

  if hostname and port and object
    @object = object
    hostname = '127.0.0.1'.freeze if (hostname.to_s.upcase == 'LOCALHOST'.freeze)

    @thread = Thread.new do

      # Create an http server to accept requests from clients
      begin
        server_config = {
          :Host   => hostname,
          :Port   => port,
          :Silent => true,
          :Verbose => false,
          :Threads => "0:#{max_threads}",
        }

        # The run call will block until the server is stopped.
        Rack::Handler::Puma.run(JsonDrbRack.new(self, system), server_config) do |server|
          @server_mutex.synchronize do
            @server = server
          end
        end

        # Wait for all puma threads to stop before trying to close
        # the sockets
        start_time = Time.now
        while true
          puma_threads = false
          Thread.list.each {|thread| puma_threads = true if thread.inspect.match(/puma/)}
          break if !puma_threads
          break if (Time.now - start_time) > PUMA_THREAD_TIMEOUT
          sleep 0.25
        end

        # Puma doesn't clean up it's own sockets after shutting down,
        # so we'll do that here.
        @server_mutex.synchronize do
          @server.binder.close() if @server
        end

      # The address in use error is pretty typical if an existing
      # CmdTlmServer is running so explicitly rescue this
      rescue Errno::EADDRINUSE
        @server = nil
        raise "Error binding to port #{port}.\n" +
              "Either another application is using this port\n" +
              "or the operating system is being slow cleaning up.\n" +
              "Make sure all sockets/streams are closed in all applications,\n" +
              "wait 1 minute and try again."
      # Something else went wrong which is fatal
      rescue => error
        @server = nil
        Logger.error "JsonDRb http server could not be started or unexpectedly died.\n#{error.formatted}"
        Cosmos.handle_fatal_exception(error)
      end
    end

    # Wait for the server to be started in the thread before returning.
    start_time = Time.now
    while ((Time.now - start_time) < SERVER_START_TIMEOUT) and !server_started
      sleep(0.1)
      @server_mutex.synchronize do
        server_started = true if @server and @server.running
      end
    end
    raise "JsonDRb http server could not be started." unless server_started

  elsif hostname or port or object
    raise "0 or 3 parameters must be given"
  else
    # Client - Noop
  end
end

#stop_serviceObject

Stops the DRb service by closing the socket and the processing thread



101
102
103
104
105
106
107
108
109
# File 'lib/cosmos/io/json_drb.rb', line 101

def stop_service
  # Kill the server thread
  # parameters are owner, thread, graceful_timeout, timeout_interval, hard_timeout
  Cosmos.kill_thread(self, @thread, STOP_SERVICE_TIMEOUT, 0.1, STOP_SERVICE_TIMEOUT)
  @thread = nil
  @server_mutex.synchronize do
    @server = nil
  end
end

#threadThread

Returns The server thread listening for incoming requests.

Returns:

  • (Thread)

    The server thread listening for incoming requests



208
209
210
# File 'lib/cosmos/io/json_drb.rb', line 208

def thread
  @thread
end