Class: Cosmos::TcpipServer

Inherits:
Object show all
Defined in:
lib/cosmos/io/tcpip_server.rb

Overview

TCP/IP Server which can both read and write on a single port or two independent ports. A listen thread is setup which waits for client connections. For each connection to the read port, a thread is spawned that calls the read method from the stream protocol. This data is then available by calling the TcpipServer read method. For each connection to the write port, a thread is spawned that calls the write method from the stream protocol when data is send to the TcpipServer via the write method.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(write_port, read_port, write_timeout, read_timeout, stream_protocol_type, *stream_protocol_args) ⇒ TcpipServer

Returns a new instance of TcpipServer.

Parameters:

  • write_port (Integer)

    The server write port. Clients should connect and expect to receive data from this port.

  • read_port (Integer)

    The server read port. Clients should connect and expect to send data to this port.

  • write_timeout (Float|nil)

    The number of seconds to wait for the write to complete. Pass nil to block until the write is complete.

  • read_timeout (Float|nil)

    The number of seconds to wait for the read to complete. Pass nil to block until the read is complete.

  • stream_protocol_type (String)

    The name of the stream protocol to use for both the read and write ports. This name is combined with 'StreamProtocol' to result in a COSMOS StreamProtocol class.

  • stream_protocol_args (Array)

    The arguments to pass to the StreamProtocol class constructor



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/cosmos/io/tcpip_server.rb', line 57

def initialize(write_port,
               read_port,
               write_timeout,
               read_timeout,
               stream_protocol_type,
               *stream_protocol_args)
  @write_port = ConfigParser.handle_nil(write_port)
  @write_port = Integer(write_port) if @write_port
  @read_port = ConfigParser.handle_nil(read_port)
  @read_port = Integer(read_port) if @read_port
  @write_timeout = ConfigParser.handle_nil(write_timeout)
  @write_timeout = @write_timeout.to_f if @write_timeout
  @read_timeout = ConfigParser.handle_nil(read_timeout)
  @read_timeout = @read_timeout.to_f if @read_timeout

  stream_protocol_class = stream_protocol_type.to_s.capitalize << 'StreamProtocol'
  @stream_protocol_class = Cosmos.require_class(stream_protocol_class.class_name_to_filename)
  @stream_protocol_args = stream_protocol_args

  @listen_sockets = []
  @listen_pipes = []
  @listen_threads = []
  @read_threads = []
  @write_stream_protocols = []
  @read_stream_protocols = []
  @write_queue = nil
  @write_queue = Queue.new if @write_port
  @read_queue = nil
  @read_queue = Queue.new if @read_port
  @write_mutex = nil
  @write_mutex = Mutex.new if @write_port
  @write_condition_variable = nil
  @write_condition_variable = ConditionVariable.new if @write_port
  @write_connection_callback = nil
  @read_connection_callback = nil
  @bytes_read = 0
  @bytes_written = 0
  @raw_logger_pair = nil
  @raw_logging_enabled = false
  @interface = nil
  @connection_mutex = Mutex.new
  @listen_address = Socket::INADDR_ANY

  @connected = false
end

Instance Attribute Details

#bytes_readObject

The number of bytes read from all connected sockets



35
36
37
# File 'lib/cosmos/io/tcpip_server.rb', line 35

def bytes_read
  @bytes_read
end

#bytes_writtenObject

The number of bytes written to the TcpipServer. This number does not vary with the number of clients connected to the write port.



38
39
40
# File 'lib/cosmos/io/tcpip_server.rb', line 38

def bytes_written
  @bytes_written
end

#listen_addressString

Returns The ip address to bind to. Default to ANY (0.0.0.0).

Returns:

  • (String)

    The ip address to bind to. Default to ANY (0.0.0.0)



42
43
44
# File 'lib/cosmos/io/tcpip_server.rb', line 42

def listen_address
  @listen_address
end

#raw_logger_pairRawLoggerPair

Returns RawLoggerPair instance or nil.

Returns:



40
41
42
# File 'lib/cosmos/io/tcpip_server.rb', line 40

def raw_logger_pair
  @raw_logger_pair
end

#read_connection_callbackObject

Callback method to call when a new client connects to the read port. This method will be called with the StreamProtocol as the only argument.



33
34
35
# File 'lib/cosmos/io/tcpip_server.rb', line 33

def read_connection_callback
  @read_connection_callback
end

#write_connection_callbackObject

Callback method to call when a new client connects to the write port. This method will be called with the StreamProtocol as the only argument.



30
31
32
# File 'lib/cosmos/io/tcpip_server.rb', line 30

def write_connection_callback
  @write_connection_callback
end

Instance Method Details

#connectObject

Create the read and write port listen threads. Incoming connections will spawn separate threads to process the reads and writes.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/cosmos/io/tcpip_server.rb', line 113

def connect
  @cancel_threads = false
  if @read_queue
    # Empty the read queue of any residual
    begin
      @read_queue.pop(true) while @read_queue.length > 0
    rescue
    end
  end
  if @write_port == @read_port
    # Handle one socket case
    start_listen_thread(@read_port, true, true)
  else
    if @write_port
      start_listen_thread(@write_port, true, false)
    end

    if @read_port
      start_listen_thread(@read_port, false, true)
    end
  end

  if @write_port
    # Start write thread
    @write_thread = Thread.new do
      begin
        while true
          write_thread_body()
          break if @cancel_threads
        end
      rescue Exception => err
        @connection_mutex.synchronize do
          @write_stream_protocols.each do |stream_protocol, hostname, host_ip, port|
            stream_protocol.disconnect
            stream_protocol.stream.raw_logger_pair.stop if stream_protocol.stream.raw_logger_pair
          end
          @write_stream_protocols.clear
        end
        Logger.instance.error("Tcpip server write thread unexpectedly died")
        Logger.instance.error(err.formatted)
      end
    end
  else
    @write_thread = nil
  end
  @connected = true
end

#connected?Boolean

Returns Whether the server is listening for connections.

Returns:

  • (Boolean)

    Whether the server is listening for connections



162
163
164
# File 'lib/cosmos/io/tcpip_server.rb', line 162

def connected?
  @connected
end

#disconnectObject

Shutdowns the listener threads for both the read and write ports as well as any client connections. As a part of shutting down client connections, the StreamProtocol#disconnect method is called.



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/cosmos/io/tcpip_server.rb', line 169

def disconnect
  @cancel_threads = true
  @read_queue << nil if @read_queue
  @listen_pipes.each do |pipe|
    begin
      pipe.write('.')
    rescue Exception
      # Oh well
    end
  end
  @listen_pipes.clear

  # Shutdown Listen Thread(s)
  @listen_threads.each do |listen_thread|
    Cosmos.kill_thread(self, listen_thread)
  end
  @listen_threads.clear

  # Shutdown Listen Socket(s)
  @listen_sockets.each do |listen_socket|
    begin
      Cosmos.close_socket(listen_socket)
    rescue IOError
      # Ok may have been closed by the thread
    end
  end
  @listen_sockets.clear

  # Shutdown Read Stream Protocols - This should unblock read threads
  @connection_mutex.synchronize do
    @read_stream_protocols.each do |stream_protocol, hostname, host_ip, port|
      stream_protocol.disconnect
      stream_protocol.stream.raw_logger_pair.stop if stream_protocol.stream.raw_logger_pair
    end
    @read_stream_protocols.clear
  end

  # Shutdown Read Threads
  @read_threads.each do |thread|
    Cosmos.kill_thread(self, thread)
  end
  @read_threads.clear

  # Shutdown Write Thread
  if @write_thread
    Cosmos.kill_thread(self, @write_thread)
    @write_thread = nil
  end

  # Shutdown Write Stream Protocols
  @connection_mutex.synchronize do
    @write_stream_protocols.each do |stream_protocol, hostname, host_ip, port|
      stream_protocol.disconnect
      stream_protocol.stream.raw_logger_pair.stop if stream_protocol.stream.raw_logger_pair
    end
    @write_stream_protocols.clear
  end

  @connected = false
end

#graceful_killObject

Gracefully kill all the threads



231
232
233
# File 'lib/cosmos/io/tcpip_server.rb', line 231

def graceful_kill
  # This method is just here to prevent warnings
end

#interface=(interface) ⇒ Object

Parameters:

  • interface (Interface)

    Sets the higher level interface which is using this TcpipServer. If the interface defines post_read_data, post_read_packet, or pre_write_packet, then these methods will be called over any subclass implementations within the stream protocol.



107
108
109
# File 'lib/cosmos/io/tcpip_server.rb', line 107

def interface=(interface)
  @interface = interface
end

#num_clientsInteger

Returns The number of connected clients.

Returns:

  • (Integer)

    The number of connected clients



284
285
286
287
288
289
290
291
292
293
# File 'lib/cosmos/io/tcpip_server.rb', line 284

def num_clients
  clients = []
  @write_stream_protocols.each do |stream_protocol, hostname, host_ip, port|
    clients << [host_ip, port]
  end
  @read_stream_protocols.each do |stream_protocol, hostname, host_ip, port|
    clients << [host_ip, port]
  end
  clients.uniq.length
end

#readPacket

Returns Latest packet read from any of the connected clients. Note this method blocks until data is available.

Returns:

  • (Packet)

    Latest packet read from any of the connected clients. Note this method blocks until data is available.



237
238
239
240
241
242
243
# File 'lib/cosmos/io/tcpip_server.rb', line 237

def read
  return nil unless @read_queue
  packet = @read_queue.pop
  return nil unless packet
  @bytes_read += packet.buffer.length
  packet
end

#read_queue_sizeInteger

Returns The number of packets waiting on the read queue.

Returns:

  • (Integer)

    The number of packets waiting on the read queue



266
267
268
269
270
271
272
# File 'lib/cosmos/io/tcpip_server.rb', line 266

def read_queue_size
  if @read_queue
    @read_queue.size
  else
    0
  end
end

#start_raw_loggingObject

Start raw logging for this interface



296
297
298
299
300
301
302
303
304
305
306
# File 'lib/cosmos/io/tcpip_server.rb', line 296

def start_raw_logging
  @raw_logging_enabled = true
  if @raw_logger_pair
    @write_stream_protocols.each do |stream_protocol, hostname, host_ip, port|
      stream_protocol.stream.raw_logger_pair.start if stream_protocol.stream.raw_logger_pair
    end
    @read_stream_protocols.each do |stream_protocol, hostname, host_ip, port|
      stream_protocol.stream.raw_logger_pair.start if stream_protocol.stream.raw_logger_pair
    end
  end
end

#stop_raw_loggingObject

Stop raw logging for this interface



309
310
311
312
313
314
315
316
317
318
319
# File 'lib/cosmos/io/tcpip_server.rb', line 309

def stop_raw_logging
  @raw_logging_enabled = false
  if @raw_logger_pair
    @write_stream_protocols.each do |stream_protocol, hostname, host_ip, port|
      stream_protocol.stream.raw_logger_pair.stop if stream_protocol.stream.raw_logger_pair
    end
    @read_stream_protocols.each do |stream_protocol, hostname, host_ip, port|
      stream_protocol.stream.raw_logger_pair.stop if stream_protocol.stream.raw_logger_pair
    end
  end
end

#write(packet) ⇒ Object

Parameters:

  • packet (Packet)

    Packet to write to all clients connected to the write port.



247
248
249
250
251
252
# File 'lib/cosmos/io/tcpip_server.rb', line 247

def write(packet)
  return unless @write_queue
  @write_queue << packet.clone
  @bytes_written += packet.buffer.length
  @write_condition_variable.broadcast
end

#write_queue_sizeInteger

Returns The number of packets waiting on the write queue.

Returns:

  • (Integer)

    The number of packets waiting on the write queue



275
276
277
278
279
280
281
# File 'lib/cosmos/io/tcpip_server.rb', line 275

def write_queue_size
  if @write_queue
    @write_queue.size
  else
    0
  end
end

#write_raw(data) ⇒ Object

Parameters:

  • data (String)

    Data to write to all clients connected to the write port.



256
257
258
259
260
261
262
263
# File 'lib/cosmos/io/tcpip_server.rb', line 256

def write_raw(data)
  return unless @write_queue
  packet = Packet.new(nil, nil)
  packet.buffer = data
  @write_queue << packet
  @bytes_written += data.length
  @write_condition_variable.broadcast
end