Class: Cosmos::JsonDRb

Inherits:
Object show all
Defined in:
lib/cosmos/io/json_drb.rb

Overview

JsonDRb implements the JSON-RPC 2.0 Specification to provide an interface for both internal and external tools to access the COSMOS server. It provides methods to install an access control list to control access to the API. It also limits the available methods to a known list of allowable API methods.

Constant Summary collapse

MINIMUM_REQUEST_TIME =
0.0001
@@debug =
false

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeJsonDRb

Returns a new instance of JsonDRb.



40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/cosmos/io/json_drb.rb', line 40

def initialize
  @listen_socket = nil
  @thread = nil
  @acl = nil
  @object = nil
  @method_whitelist = nil
  @request_count = 0
  @request_times = []
  @request_times_index = 0
  @request_mutex = Mutex.new
  @num_clients = 0
  @thread_reader, @thread_writer = IO.pipe
end

Instance Attribute Details

#aclACL

Returns The access control list.

Returns:

  • (ACL)

    The access control list



38
39
40
# File 'lib/cosmos/io/json_drb.rb', line 38

def acl
  @acl
end

#method_whitelistArray<String>

Returns List of methods that should be allowed.

Returns:

  • (Array<String>)

    List of methods that should be allowed



36
37
38
# File 'lib/cosmos/io/json_drb.rb', line 36

def method_whitelist
  @method_whitelist
end

#num_clientsInteger

Returns The number of clients currently connected to the server.

Returns:

  • (Integer)

    The number of clients currently connected to the server



34
35
36
# File 'lib/cosmos/io/json_drb.rb', line 34

def num_clients
  @num_clients
end

#request_countInteger

Returns The number of JSON-RPC requests processed.

Returns:

  • (Integer)

    The number of JSON-RPC requests processed



32
33
34
# File 'lib/cosmos/io/json_drb.rb', line 32

def request_count
  @request_count
end

Class Method Details

.debug=(value) ⇒ Object

Parameters:

  • value (Boolean)

    Whether to enable debug messages



242
243
244
# File 'lib/cosmos/io/json_drb.rb', line 242

def self.debug=(value)
  @@debug = value
end

.debug?Boolean

Returns Whether debug messages are enabled.

Returns:

  • (Boolean)

    Whether debug messages are enabled



237
238
239
# File 'lib/cosmos/io/json_drb.rb', line 237

def self.debug?
  @@debug
end

.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes) ⇒ Object

Parameters:

  • socket (Socket)

    The socket to the client

  • current_data (String)

    Binary data read from the socket

  • required_num_bytes (Integer)

    The minimum number of bytes to read before returning



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/cosmos/io/json_drb.rb', line 194

def self.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes)
  while (current_data.length < required_num_bytes)
    begin
      data = socket.recv_nonblock(65535)
      if data.length == 0
        current_data.replace('')
        return
      end
      current_data << data
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK
      IO.fast_select([socket], nil, nil, nil)
      retry
    end
  end
end

.receive_message(socket, data) ⇒ String

Returns The request message.

Parameters:

  • socket (Socket)

    The socket to the client

  • data (String)

    Binary data which has already been read from the socket.

Returns:

  • (String)

    The request message



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/cosmos/io/json_drb.rb', line 171

def self.receive_message(socket, data)
  self.get_at_least_x_bytes_of_data(socket, data, 4)
  if data.length >= 4
    length = data[0..3].unpack('N')[0]
    data.replace(data[4..-1])
  else
    return nil
  end

  self.get_at_least_x_bytes_of_data(socket, data, length)
  if data.length >= length
    message = data[0..(length - 1)]
    data.replace(data[length..-1])
    return message
  else
    return nil
  end
end

.send_data(socket, data, send_timeout = 10.0) ⇒ Object

Parameters:

  • socket (Socket)

    The socket to the client

  • data (String)

    Binary data to send to the socket

  • send_timeout (Float) (defaults to: 10.0)

    The number of seconds to wait for the send to complete



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/cosmos/io/json_drb.rb', line 214

def self.send_data(socket, data, send_timeout = 10.0)
  num_bytes_to_send = data.length + 4
  total_bytes_sent = 0
  bytes_sent = 0
  data_to_send = [data.length].pack('N') << data.clone

  loop do
    begin
      bytes_sent = socket.write_nonblock(data_to_send[total_bytes_sent..-1])
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK
      result = IO.fast_select(nil, [socket], nil, send_timeout)
      if result
        retry
      else
        raise Timeout::Error, "Send Timeout"
      end
    end
    total_bytes_sent += bytes_sent
    break if total_bytes_sent >= num_bytes_to_send
  end
end

Instance Method Details

#add_request_time(request_time) ⇒ Object

Adds a request time to the list. A request time consists of the amount of time to receive the request, process it, and send the response. These times are used by the #average_request_time method to calculate an average request time.

Parameters:

  • request_time (Float)

    Time in seconds for the data transmission



148
149
150
151
152
153
154
155
# File 'lib/cosmos/io/json_drb.rb', line 148

def add_request_time(request_time)
  @request_mutex.synchronize do
    request_time = MINIMUM_REQUEST_TIME if request_time < MINIMUM_REQUEST_TIME
    @request_times[@request_times_index] = request_time
    @request_times_index += 1
    @request_times_index = 0 if @request_times_index >= 100
  end
end

#average_request_timeFloat

Returns The average time in seconds for a JSON DRb request to be processed and the response sent.

Returns:

  • (Float)

    The average time in seconds for a JSON DRb request to be processed and the response sent.



159
160
161
162
163
164
165
# File 'lib/cosmos/io/json_drb.rb', line 159

def average_request_time
  avg = 0
  @request_mutex.synchronize do
    avg = @request_times.mean
  end
  avg
end

#graceful_killObject

Gracefully kill the thread



63
64
65
# File 'lib/cosmos/io/json_drb.rb', line 63

def graceful_kill
  @thread_writer.write('.') if @thread
end

#start_service(hostname = nil, port = nil, object = nil) ⇒ Object

Parameters:

  • hostname (String) (defaults to: nil)

    The host to start the service on

  • port (Integer) (defaults to: nil)

    The port number to listen for connections

  • object (Object) (defaults to: nil)

    The object to send the DRb requests to. This object must either include the Cosmos::Script module or be the CmdTlmServer.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/cosmos/io/json_drb.rb', line 72

def start_service(hostname = nil, port = nil, object = nil)
  if hostname and port and object
    @object = object
    hostname = '127.0.0.1' if (hostname.to_s.upcase == 'LOCALHOST')

    # Create a socket to accept connections from clients
    begin
      @listen_socket = TCPServer.new(hostname, port)
      @listen_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) unless Kernel.is_windows?
    # The address is use error is pretty typical if an existing
    # CmdTlmServer is running so explicitly rescue this
    rescue Errno::EADDRINUSE
      raise "Error binding to port #{port}.\n" +
            "Either another application is using this port\n" +
            "or the operating system is being slow cleaning up.\n" +
            "Make sure all sockets/streams are closed in all applications,\n" +
            "wait 1 minute and try again."
    # Something else went wrong which is fatal
    rescue => error
      Logger.error "JsonDRb listen thread unable to be created.\n#{error.formatted}"
      Cosmos.handle_fatal_exception(error)
    end

    # Start the listen thread which accepts connections
    @thread = Thread.new do
      begin
        while true
          begin
            socket = @listen_socket.accept_nonblock
          rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR, Errno::EWOULDBLOCK
            read_ready, _ = IO.select([@listen_socket, @thread_reader])
            if read_ready and read_ready.include?(@thread_reader)
              begin
                # Thread should be killed - Cleanout thread_reader first
                # Don't let this break anything else though
                @thread_reader.read(1)
              rescue Exception
                # Oh well - create a clean pipe in case we need one
                @thread_reader, @thread_writer = IO.pipe
              end
              break
            else
              retry
            end
          end

          if @acl and !@acl.allow_socket?(socket)
            socket.close
            next
          end
          # Create new thread for new connection
          create_client_thread(socket)
        end
      rescue Exception => error
        Logger.error "JsonDRb listen thread unexpectedly died.\n#{error.formatted}"
        Cosmos.handle_fatal_exception(error)
      end
    end
  elsif hostname or port or object
    raise "0 or 3 parameters must be given"
  else
    # Client - Noop
  end
end

#stop_serviceObject

Stops the DRb service by closing the socket and the processing thread



55
56
57
58
59
60
# File 'lib/cosmos/io/json_drb.rb', line 55

def stop_service
  Cosmos.kill_thread(self, @thread)
  @thread = nil
  @listen_socket.close if @listen_socket and !@listen_socket.closed?
  @listen_socket = nil
end

#threadThread

Returns The server thread listening for incoming requests.

Returns:

  • (Thread)

    The server thread listening for incoming requests



138
139
140
# File 'lib/cosmos/io/json_drb.rb', line 138

def thread
  @thread
end