Class: Cosmos::JsonDRb

Inherits:
Object show all
Defined in:
lib/cosmos/io/json_drb.rb

Overview

JsonDRb implements the JSON-RPC 2.0 Specification to provide an interface for both internal and external tools to access the COSMOS server. It provides methods to install an access control list to control access to the API. It also limits the available methods to a known list of allowable API methods.

Constant Summary collapse

MINIMUM_REQUEST_TIME =
0.0001
FAST_READ =
(RUBY_VERSION > "2.1")
@@debug =
false

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeJsonDRb

Returns a new instance of JsonDRb.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/cosmos/io/json_drb.rb', line 39

def initialize
  @listen_socket = nil
  @thread = nil
  @acl = nil
  @object = nil
  @method_whitelist = nil
  @request_count = 0
  @request_times = []
  @request_times_index = 0
  @request_mutex = Mutex.new
  @client_sockets = []
  @client_threads = []
  @client_pipe_writers = []
  @client_mutex = Mutex.new
  @thread_reader, @thread_writer = IO.pipe
end

Instance Attribute Details

#aclACL

Returns The access control list.

Returns:

  • (ACL)

    The access control list



37
38
39
# File 'lib/cosmos/io/json_drb.rb', line 37

def acl
  @acl
end

#method_whitelistArray<String>

Returns List of methods that should be allowed.

Returns:

  • (Array<String>)

    List of methods that should be allowed



35
36
37
# File 'lib/cosmos/io/json_drb.rb', line 35

def method_whitelist
  @method_whitelist
end

#request_countInteger

Returns The number of JSON-RPC requests processed.

Returns:

  • (Integer)

    The number of JSON-RPC requests processed



33
34
35
# File 'lib/cosmos/io/json_drb.rb', line 33

def request_count
  @request_count
end

Class Method Details

.debug=(value) ⇒ Object

Parameters:

  • value (Boolean)

    Whether to enable debug messages



272
273
274
# File 'lib/cosmos/io/json_drb.rb', line 272

def self.debug=(value)
  @@debug = value
end

.debug?Boolean

Returns Whether debug messages are enabled.

Returns:

  • (Boolean)

    Whether debug messages are enabled



267
268
269
# File 'lib/cosmos/io/json_drb.rb', line 267

def self.debug?
  @@debug
end

.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes, pipe_reader) ⇒ Object

Parameters:

  • socket (Socket)

    The socket to the client

  • current_data (String)

    Binary data read from the socket

  • required_num_bytes (Integer)

    The minimum number of bytes to read

  • pipe_reader (IO.pipe)

    Used to break out of select before returning



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/cosmos/io/json_drb.rb', line 220

def self.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes, pipe_reader)
  while (current_data.length < required_num_bytes)
    if FAST_READ
      data = socket.read_nonblock(65535, exception: false)
      raise EOFError, 'end of file reached' unless data
      if data == :wait_readable
        IO.fast_select([socket, pipe_reader], nil, nil, nil)
      else
        current_data << data
      end
    else
      begin
        current_data << socket.read_nonblock(65535)
      rescue IO::WaitReadable
        IO.fast_select([socket, pipe_reader], nil, nil, nil)
      end
    end
  end
end

.receive_message(socket, data, pipe_reader) ⇒ String

Returns The request message.

Parameters:

  • socket (Socket)

    The socket to the client

  • data (String)

    Binary data which has already been read from the socket.

  • pipe_reader (IO.pipe)

    Used to break out of select

Returns:

  • (String)

    The request message



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/cosmos/io/json_drb.rb', line 196

def self.receive_message(socket, data, pipe_reader)
  self.get_at_least_x_bytes_of_data(socket, data, 4, pipe_reader)
  if data.length >= 4
    length = data[0..3].unpack('N'.freeze)[0]
    data.replace(data[4..-1])
  else
    return nil
  end

  self.get_at_least_x_bytes_of_data(socket, data, length, pipe_reader)
  if data.length >= length
    message = data[0..(length - 1)]
    data.replace(data[length..-1])
    return message
  else
    return nil
  end
end

.send_data(socket, data, send_timeout = 10.0) ⇒ Object

Parameters:

  • socket (Socket)

    The socket to the client

  • data (String)

    Binary data to send to the socket

  • send_timeout (Float) (defaults to: 10.0)

    The number of seconds to wait for the send to complete



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/cosmos/io/json_drb.rb', line 244

def self.send_data(socket, data, send_timeout = 10.0)
  num_bytes_to_send = data.length + 4
  total_bytes_sent = 0
  bytes_sent = 0
  data_to_send = [data.length].pack('N'.freeze) << data.clone

  loop do
    begin
      bytes_sent = socket.write_nonblock(data_to_send[total_bytes_sent..-1])
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK
      result = IO.fast_select(nil, [socket], nil, send_timeout)
      if result
        retry
      else
        raise Timeout::Error, "Send Timeout"
      end
    end
    total_bytes_sent += bytes_sent
    break if total_bytes_sent >= num_bytes_to_send
  end
end

Instance Method Details

#add_request_time(request_time) ⇒ Object

Adds a request time to the list. A request time consists of the amount of time to receive the request, process it, and send the response. These times are used by the #average_request_time method to calculate an average request time.

Parameters:

  • request_time (Float)

    Time in seconds for the data transmission



172
173
174
175
176
177
178
179
# File 'lib/cosmos/io/json_drb.rb', line 172

def add_request_time(request_time)
  @request_mutex.synchronize do
    request_time = MINIMUM_REQUEST_TIME if request_time < MINIMUM_REQUEST_TIME
    @request_times[@request_times_index] = request_time
    @request_times_index += 1
    @request_times_index = 0 if @request_times_index >= 100
  end
end

#average_request_timeFloat

Returns The average time in seconds for a JSON DRb request to be processed and the response sent.

Returns:

  • (Float)

    The average time in seconds for a JSON DRb request to be processed and the response sent.



183
184
185
186
187
188
189
# File 'lib/cosmos/io/json_drb.rb', line 183

def average_request_time
  avg = 0
  @request_mutex.synchronize do
    avg = @request_times.mean
  end
  avg
end

#graceful_killObject

Gracefully kill the thread



93
94
95
# File 'lib/cosmos/io/json_drb.rb', line 93

def graceful_kill
  @thread_writer.write('.') if @thread
end

#num_clientsInteger

Returns the number of connected clients

Returns:

  • (Integer)

    The number of connected clients



58
59
60
# File 'lib/cosmos/io/json_drb.rb', line 58

def num_clients
  @client_threads.length
end

#start_service(hostname = nil, port = nil, object = nil) ⇒ Object

Parameters:

  • hostname (String) (defaults to: nil)

    The host to start the service on

  • port (Integer) (defaults to: nil)

    The port number to listen for connections

  • object (Object) (defaults to: nil)

    The object to send the DRb requests to. This object must either include the Cosmos::Script module or be the CmdTlmServer.



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/cosmos/io/json_drb.rb', line 102

def start_service(hostname = nil, port = nil, object = nil)
  if hostname and port and object
    @thread_reader, @thread_writer = IO.pipe
    @object = object
    hostname = '127.0.0.1'.freeze if (hostname.to_s.upcase == 'LOCALHOST'.freeze)

    # Create a socket to accept connections from clients
    begin
      @listen_socket = TCPServer.new(hostname, port)
      @listen_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) unless Kernel.is_windows?
    # The address is use error is pretty typical if an existing
    # CmdTlmServer is running so explicitly rescue this
    rescue Errno::EADDRINUSE
      raise "Error binding to port #{port}.\n" +
            "Either another application is using this port\n" +
            "or the operating system is being slow cleaning up.\n" +
            "Make sure all sockets/streams are closed in all applications,\n" +
            "wait 1 minute and try again."
    # Something else went wrong which is fatal
    rescue => error
      Logger.error "JsonDRb listen thread unable to be created.\n#{error.formatted}"
      Cosmos.handle_fatal_exception(error)
    end

    # Start the listen thread which accepts connections
    @thread = Thread.new do
      begin
        while true
          begin
            socket = @listen_socket.accept_nonblock
          rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR, Errno::EWOULDBLOCK
            read_ready, _ = IO.select([@listen_socket, @thread_reader])
            if read_ready and read_ready.include?(@thread_reader)
              # Thread should be killed
              break
            else
              retry
            end
          end

          if @acl and !@acl.allow_socket?(socket)
            Cosmos.close_socket(socket)
            next
          end
          # Create new thread for new connection
          create_client_thread(socket)
        end
      rescue Exception => error
        Logger.error "JsonDRb listen thread unexpectedly died.\n#{error.formatted}"
        Cosmos.handle_fatal_exception(error)
      end
    end
  elsif hostname or port or object
    raise "0 or 3 parameters must be given"
  else
    # Client - Noop
  end
end

#stop_serviceObject

Stops the DRb service by closing the socket and the processing thread



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/cosmos/io/json_drb.rb', line 63

def stop_service
  Cosmos.kill_thread(self, @thread)
  @thread = nil
  Cosmos.close_socket(@listen_socket)
  @listen_socket = nil
  client_threads = nil
  @client_mutex.synchronize do
    @client_sockets.each do |client_socket|
      Cosmos.close_socket(client_socket)
    end
    @client_pipe_writers.each do |client_pipe_writer|
      client_pipe_writer.write('.')
    end
    client_threads = @client_threads.clone
  end

  # This cannot be inside of the client_mutex or the threads will not
  # be able to shutdown because they will stick on the client_mutex
  client_threads.each do |client_thread|
    Cosmos.kill_thread(self, client_thread)
  end

  @client_mutex.synchronize do
    @client_threads.clear
    @client_sockets.clear
    @client_pipe_writers.clear
  end
end

#threadThread

Returns The server thread listening for incoming requests.

Returns:

  • (Thread)

    The server thread listening for incoming requests



162
163
164
# File 'lib/cosmos/io/json_drb.rb', line 162

def thread
  @thread
end