Class: Cosmos::TcpipServer
Overview
TCP/IP Server which can both read and write on a single port or two independent ports. A listen thread is setup which waits for client connections. For each connection to the read port, a thread is spawned that calls the read method from the stream protocol. This data is then available by calling the TcpipServer read method. For each connection to the write port, a thread is spawned that calls the write method from the stream protocol when data is send to the TcpipServer via the write method.
Instance Attribute Summary collapse
-
#bytes_read ⇒ Object
The number of bytes read from all connected sockets.
-
#bytes_written ⇒ Object
The number of bytes written to the TcpipServer.
-
#listen_address ⇒ String
The ip address to bind to.
-
#raw_logger_pair ⇒ RawLoggerPair
RawLoggerPair instance or nil.
-
#read_connection_callback ⇒ Object
Callback method to call when a new client connects to the read port.
-
#write_connection_callback ⇒ Object
Callback method to call when a new client connects to the write port.
Instance Method Summary collapse
-
#connect ⇒ Object
Create the read and write port listen threads.
-
#connected? ⇒ Boolean
Whether the server is listening for connections.
-
#disconnect ⇒ Object
Shutdowns the listener threads for both the read and write ports as well as any client connections.
-
#graceful_kill ⇒ Object
Gracefully kill all the threads.
-
#initialize(write_port, read_port, write_timeout, read_timeout, stream_protocol_type, *stream_protocol_args) ⇒ TcpipServer
constructor
A new instance of TcpipServer.
- #interface=(interface) ⇒ Object
-
#num_clients ⇒ Integer
The number of connected clients.
-
#read ⇒ Packet
Latest packet read from any of the connected clients.
-
#read_queue_size ⇒ Integer
The number of packets waiting on the read queue.
-
#start_raw_logging ⇒ Object
Start raw logging for this interface.
-
#stop_raw_logging ⇒ Object
Stop raw logging for this interface.
- #write(packet) ⇒ Object
-
#write_queue_size ⇒ Integer
The number of packets waiting on the write queue.
- #write_raw(data) ⇒ Object
Constructor Details
#initialize(write_port, read_port, write_timeout, read_timeout, stream_protocol_type, *stream_protocol_args) ⇒ TcpipServer
Returns a new instance of TcpipServer.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/cosmos/io/tcpip_server.rb', line 57 def initialize(write_port, read_port, write_timeout, read_timeout, stream_protocol_type, *stream_protocol_args) @write_port = ConfigParser.handle_nil(write_port) @write_port = Integer(write_port) if @write_port @read_port = ConfigParser.handle_nil(read_port) @read_port = Integer(read_port) if @read_port @write_timeout = ConfigParser.handle_nil(write_timeout) @write_timeout = @write_timeout.to_f if @write_timeout @read_timeout = ConfigParser.handle_nil(read_timeout) @read_timeout = @read_timeout.to_f if @read_timeout stream_protocol_class = stream_protocol_type.to_s.capitalize << 'StreamProtocol' @stream_protocol_class = stream_protocol_class.to_class unless @stream_protocol_class begin require "cosmos/streams/#{stream_protocol_class.class_name_to_filename}" @stream_protocol_class = stream_protocol_class.to_class rescue LoadError => err Logger.instance.error "Unable to require #{stream_protocol_class.class_name_to_filename} due to #{err.message}. Ensure #{stream_protocol_class.class_name_to_filename} is in the COSMOS lib directory." raise "Unable to require #{stream_protocol_class.class_name_to_filename} due to #{err.message}. Ensure #{stream_protocol_class.class_name_to_filename} is in the COSMOS lib directory." end end @stream_protocol_args = stream_protocol_args @listen_sockets = [] @listen_pipes = [] @listen_threads = [] @read_threads = [] @write_stream_protocols = [] @read_stream_protocols = [] @write_queue = nil @write_queue = Queue.new if @write_port @read_queue = nil @read_queue = Queue.new if @read_port @write_mutex = nil @write_mutex = Mutex.new if @write_port @write_condition_variable = nil @write_condition_variable = ConditionVariable.new if @write_port @write_connection_callback = nil @read_connection_callback = nil @bytes_read = 0 @bytes_written = 0 @raw_logger_pair = nil @raw_logging_enabled = false @interface = nil @connection_mutex = Mutex.new @listen_address = Socket::INADDR_ANY @connected = false end |
Instance Attribute Details
#bytes_read ⇒ Object
The number of bytes read from all connected sockets
35 36 37 |
# File 'lib/cosmos/io/tcpip_server.rb', line 35 def bytes_read @bytes_read end |
#bytes_written ⇒ Object
The number of bytes written to the TcpipServer. This number does not vary with the number of clients connected to the write port.
38 39 40 |
# File 'lib/cosmos/io/tcpip_server.rb', line 38 def bytes_written @bytes_written end |
#listen_address ⇒ String
Returns The ip address to bind to. Default to ANY (0.0.0.0).
42 43 44 |
# File 'lib/cosmos/io/tcpip_server.rb', line 42 def listen_address @listen_address end |
#raw_logger_pair ⇒ RawLoggerPair
Returns RawLoggerPair instance or nil.
40 41 42 |
# File 'lib/cosmos/io/tcpip_server.rb', line 40 def raw_logger_pair @raw_logger_pair end |
#read_connection_callback ⇒ Object
Callback method to call when a new client connects to the read port. This method will be called with the StreamProtocol as the only argument.
33 34 35 |
# File 'lib/cosmos/io/tcpip_server.rb', line 33 def read_connection_callback @read_connection_callback end |
#write_connection_callback ⇒ Object
Callback method to call when a new client connects to the write port. This method will be called with the StreamProtocol as the only argument.
30 31 32 |
# File 'lib/cosmos/io/tcpip_server.rb', line 30 def write_connection_callback @write_connection_callback end |
Instance Method Details
#connect ⇒ Object
Create the read and write port listen threads. Incoming connections will spawn separate threads to process the reads and writes.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/cosmos/io/tcpip_server.rb', line 123 def connect @cancel_threads = false if @read_queue # Empty the read queue of any residual begin @read_queue.pop(true) while @read_queue.length > 0 rescue end end if @write_port == @read_port # Handle one socket case start_listen_thread(@read_port, true, true) else if @write_port start_listen_thread(@write_port, true, false) end if @read_port start_listen_thread(@read_port, false, true) end end if @write_port # Start write thread @write_thread = Thread.new do begin while true write_thread_body() break if @cancel_threads end rescue Exception => err @connection_mutex.synchronize do @write_stream_protocols.each do |stream_protocol, hostname, host_ip, port| stream_protocol.disconnect stream_protocol.stream.raw_logger_pair.stop if stream_protocol.stream.raw_logger_pair end @write_stream_protocols.clear end Logger.instance.error("Tcpip server write thread unexpectedly died") Logger.instance.error(err.formatted) end end else @write_thread = nil end @connected = true end |
#connected? ⇒ Boolean
Returns Whether the server is listening for connections.
172 173 174 |
# File 'lib/cosmos/io/tcpip_server.rb', line 172 def connected? @connected end |
#disconnect ⇒ Object
Shutdowns the listener threads for both the read and write ports as well as any client connections. As a part of shutting down client connections, the StreamProtocol#disconnect method is called.
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/cosmos/io/tcpip_server.rb', line 179 def disconnect @cancel_threads = true @read_queue << nil if @read_queue @listen_pipes.each do |pipe| begin pipe.write('.') rescue Exception # Oh well end end @listen_pipes.clear # Shutdown Listen Thread(s) @listen_threads.each do |listen_thread| Cosmos.kill_thread(self, listen_thread) end @listen_threads.clear # Shutdown Listen Socket(s) @listen_sockets.each do |listen_socket| begin listen_socket.close unless listen_socket.closed? rescue IOError # Ok may have been closed by the thread end end @listen_sockets.clear # Shutdown Read Stream Protocols - This should unblock read threads @connection_mutex.synchronize do @read_stream_protocols.each do |stream_protocol, hostname, host_ip, port| stream_protocol.disconnect stream_protocol.stream.raw_logger_pair.stop if stream_protocol.stream.raw_logger_pair end @read_stream_protocols.clear end # Shutdown Read Threads @read_threads.each do |thread| Cosmos.kill_thread(self, thread) end @read_threads.clear # Shutdown Write Thread if @write_thread Cosmos.kill_thread(self, @write_thread) @write_thread = nil end # Shutdown Write Stream Protocols @connection_mutex.synchronize do @write_stream_protocols.each do |stream_protocol, hostname, host_ip, port| stream_protocol.disconnect stream_protocol.stream.raw_logger_pair.stop if stream_protocol.stream.raw_logger_pair end @write_stream_protocols.clear end @connected = false end |
#graceful_kill ⇒ Object
Gracefully kill all the threads
241 242 243 |
# File 'lib/cosmos/io/tcpip_server.rb', line 241 def graceful_kill # This method is just here to prevent warnings end |
#interface=(interface) ⇒ Object
117 118 119 |
# File 'lib/cosmos/io/tcpip_server.rb', line 117 def interface=(interface) @interface = interface end |
#num_clients ⇒ Integer
Returns The number of connected clients.
294 295 296 297 298 299 300 301 302 303 |
# File 'lib/cosmos/io/tcpip_server.rb', line 294 def num_clients clients = [] @write_stream_protocols.each do |stream_protocol, hostname, host_ip, port| clients << [host_ip, port] end @read_stream_protocols.each do |stream_protocol, hostname, host_ip, port| clients << [host_ip, port] end clients.uniq.length end |
#read ⇒ Packet
Returns Latest packet read from any of the connected clients. Note this method blocks until data is available.
247 248 249 250 251 252 253 |
# File 'lib/cosmos/io/tcpip_server.rb', line 247 def read return nil unless @read_queue packet = @read_queue.pop return nil unless packet @bytes_read += packet.buffer.length packet end |
#read_queue_size ⇒ Integer
Returns The number of packets waiting on the read queue.
276 277 278 279 280 281 282 |
# File 'lib/cosmos/io/tcpip_server.rb', line 276 def read_queue_size if @read_queue @read_queue.size else 0 end end |
#start_raw_logging ⇒ Object
Start raw logging for this interface
306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/cosmos/io/tcpip_server.rb', line 306 def start_raw_logging @raw_logging_enabled = true if @raw_logger_pair @write_stream_protocols.each do |stream_protocol, hostname, host_ip, port| stream_protocol.stream.raw_logger_pair.start if stream_protocol.stream.raw_logger_pair end @read_stream_protocols.each do |stream_protocol, hostname, host_ip, port| stream_protocol.stream.raw_logger_pair.start if stream_protocol.stream.raw_logger_pair end end end |
#stop_raw_logging ⇒ Object
Stop raw logging for this interface
319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/cosmos/io/tcpip_server.rb', line 319 def stop_raw_logging @raw_logging_enabled = false if @raw_logger_pair @write_stream_protocols.each do |stream_protocol, hostname, host_ip, port| stream_protocol.stream.raw_logger_pair.stop if stream_protocol.stream.raw_logger_pair end @read_stream_protocols.each do |stream_protocol, hostname, host_ip, port| stream_protocol.stream.raw_logger_pair.stop if stream_protocol.stream.raw_logger_pair end end end |
#write(packet) ⇒ Object
257 258 259 260 261 262 |
# File 'lib/cosmos/io/tcpip_server.rb', line 257 def write(packet) return unless @write_queue @write_queue << packet.clone @bytes_written += packet.buffer.length @write_condition_variable.broadcast end |
#write_queue_size ⇒ Integer
Returns The number of packets waiting on the write queue.
285 286 287 288 289 290 291 |
# File 'lib/cosmos/io/tcpip_server.rb', line 285 def write_queue_size if @write_queue @write_queue.size else 0 end end |
#write_raw(data) ⇒ Object
266 267 268 269 270 271 272 273 |
# File 'lib/cosmos/io/tcpip_server.rb', line 266 def write_raw(data) return unless @write_queue packet = Packet.new(nil, nil) packet.buffer = data @write_queue << packet @bytes_written += data.length @write_condition_variable.broadcast end |