Class: Cosmos::JsonDRb

Inherits:
Object show all
Defined in:
lib/cosmos/io/json_drb.rb

Overview

JsonDRb implements the JSON-RPC 2.0 Specification to provide an interface for both internal and external tools to access the COSMOS server. It provides methods to install an access control list to control access to the API. It also limits the available methods to a known list of allowable API methods.

Constant Summary collapse

MINIMUM_REQUEST_TIME =
0.0001
@@debug =
false

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeJsonDRb



40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/cosmos/io/json_drb.rb', line 40

def initialize
  @listen_socket = nil
  @thread = nil
  @acl = nil
  @object = nil
  @method_whitelist = nil
  @request_count = 0
  @request_times = []
  @request_times_index = 0
  @request_mutex = Mutex.new
  @num_clients = 0
  @thread_reader, @thread_writer = IO.pipe
end

Instance Attribute Details

#aclACL



38
39
40
# File 'lib/cosmos/io/json_drb.rb', line 38

def acl
  @acl
end

#method_whitelistArray<String>



36
37
38
# File 'lib/cosmos/io/json_drb.rb', line 36

def method_whitelist
  @method_whitelist
end

#num_clientsInteger



34
35
36
# File 'lib/cosmos/io/json_drb.rb', line 34

def num_clients
  @num_clients
end

#request_countInteger



32
33
34
# File 'lib/cosmos/io/json_drb.rb', line 32

def request_count
  @request_count
end

Class Method Details

.debug=(value) ⇒ Object



242
243
244
# File 'lib/cosmos/io/json_drb.rb', line 242

def self.debug=(value)
  @@debug = value
end

.debug?Boolean



237
238
239
# File 'lib/cosmos/io/json_drb.rb', line 237

def self.debug?
  @@debug
end

.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes) ⇒ Object



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/cosmos/io/json_drb.rb', line 194

def self.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes)
  while (current_data.length < required_num_bytes)
    begin
      data = socket.recv_nonblock(65535)
      if data.length == 0
        current_data.replace('')
        return
      end
      current_data << data
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK
      IO.fast_select([socket], nil, nil, nil)
      retry
    end
  end
end

.receive_message(socket, data) ⇒ String



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/cosmos/io/json_drb.rb', line 171

def self.receive_message(socket, data)
  self.get_at_least_x_bytes_of_data(socket, data, 4)
  if data.length >= 4
    length = data[0..3].unpack('N')[0]
    data.replace(data[4..-1])
  else
    return nil
  end

  self.get_at_least_x_bytes_of_data(socket, data, length)
  if data.length >= length
    message = data[0..(length - 1)]
    data.replace(data[length..-1])
    return message
  else
    return nil
  end
end

.send_data(socket, data, send_timeout = 10.0) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/cosmos/io/json_drb.rb', line 214

def self.send_data(socket, data, send_timeout = 10.0)
  num_bytes_to_send = data.length + 4
  total_bytes_sent = 0
  bytes_sent = 0
  data_to_send = [data.length].pack('N') << data.clone

  loop do
    begin
      bytes_sent = socket.write_nonblock(data_to_send[total_bytes_sent..-1])
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK
      result = IO.fast_select(nil, [socket], nil, send_timeout)
      if result
        retry
      else
        raise Timeout::Error, "Send Timeout"
      end
    end
    total_bytes_sent += bytes_sent
    break if total_bytes_sent >= num_bytes_to_send
  end
end

Instance Method Details

#add_request_time(request_time) ⇒ Object

Adds a request time to the list. A request time consists of the amount of time to receive the request, process it, and send the response. These times are used by the #average_request_time method to calculate an average request time.



148
149
150
151
152
153
154
155
# File 'lib/cosmos/io/json_drb.rb', line 148

def add_request_time(request_time)
  @request_mutex.synchronize do
    request_time = MINIMUM_REQUEST_TIME if request_time < MINIMUM_REQUEST_TIME
    @request_times[@request_times_index] = request_time
    @request_times_index += 1
    @request_times_index = 0 if @request_times_index >= 100
  end
end

#average_request_timeFloat



159
160
161
162
163
164
165
# File 'lib/cosmos/io/json_drb.rb', line 159

def average_request_time
  avg = 0
  @request_mutex.synchronize do
    avg = @request_times.mean
  end
  avg
end

#graceful_killObject

Gracefully kill the thread



63
64
65
# File 'lib/cosmos/io/json_drb.rb', line 63

def graceful_kill
  @thread_writer.write('.') if @thread
end

#start_service(hostname = nil, port = nil, object = nil) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/cosmos/io/json_drb.rb', line 72

def start_service(hostname = nil, port = nil, object = nil)
  if hostname and port and object
    @object = object
    hostname = '127.0.0.1' if (hostname.to_s.upcase == 'LOCALHOST')

    # Create a socket to accept connections from clients
    begin
      @listen_socket = TCPServer.new(hostname, port)
      @listen_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) unless Kernel.is_windows?
    # The address is use error is pretty typical if an existing
    # CmdTlmServer is running so explicitly rescue this
    rescue Errno::EADDRINUSE
      raise "Error binding to port #{port}.\n" +
            "Either another application is using this port\n" +
            "or the operating system is being slow cleaning up.\n" +
            "Make sure all sockets/streams are closed in all applications,\n" +
            "wait 1 minute and try again."
    # Something else went wrong which is fatal
    rescue => error
      Logger.error "JsonDRb listen thread unable to be created.\n#{error.formatted}"
      Cosmos.handle_fatal_exception(error)
    end

    # Start the listen thread which accepts connections
    @thread = Thread.new do
      begin
        while true
          begin
            socket = @listen_socket.accept_nonblock
          rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR, Errno::EWOULDBLOCK
            read_ready, _ = IO.select([@listen_socket, @thread_reader])
            if read_ready and read_ready.include?(@thread_reader)
              begin
                # Thread should be killed - Cleanout thread_reader first
                # Don't let this break anything else though
                @thread_reader.read(1)
              rescue Exception
                # Oh well - create a clean pipe in case we need one
                @thread_reader, @thread_writer = IO.pipe
              end
              break
            else
              retry
            end
          end

          if @acl and !@acl.allow_socket?(socket)
            socket.close
            next
          end
          # Create new thread for new connection
          create_client_thread(socket)
        end
      rescue Exception => error
        Logger.error "JsonDRb listen thread unexpectedly died.\n#{error.formatted}"
        Cosmos.handle_fatal_exception(error)
      end
    end
  elsif hostname or port or object
    raise "0 or 3 parameters must be given"
  else
    # Client - Noop
  end
end

#stop_serviceObject

Stops the DRb service by closing the socket and the processing thread



55
56
57
58
59
60
# File 'lib/cosmos/io/json_drb.rb', line 55

def stop_service
  Cosmos.kill_thread(self, @thread)
  @thread = nil
  @listen_socket.close if @listen_socket and !@listen_socket.closed?
  @listen_socket = nil
end

#threadThread



138
139
140
# File 'lib/cosmos/io/json_drb.rb', line 138

def thread
  @thread
end