Class: LogStash::Inputs::Tcp
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Tcp
- Defined in:
- lib/logstash/inputs/tcp.rb
Overview
Read events over a TCP socket.
Like stdin and file inputs, each event is assumed to be one line of text.
Can either accept connections from clients or connect to a server, depending on ‘mode`.
Defined Under Namespace
Classes: Interrupted
Instance Method Summary collapse
-
#initialize(*args) ⇒ Tcp
constructor
A new instance of Tcp.
- #register ⇒ Object
- #run(output_queue) ⇒ Object
-
#run_client(output_queue) ⇒ Object
def run_server.
-
#run_server(output_queue) ⇒ Object
def run.
- #teardown ⇒ Object
Constructor Details
#initialize(*args) ⇒ Tcp
Returns a new instance of Tcp.
55 56 57 |
# File 'lib/logstash/inputs/tcp.rb', line 55 def initialize(*args) super(*args) end |
Instance Method Details
#register ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/logstash/inputs/tcp.rb', line 60 def register require "socket" require "timeout" require "openssl" # monkey patch TCPSocket and SSLSocket to include socket peer TCPSocket.module_eval{include ::LogStash::Util::SocketPeer} OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer} fix_streaming_codecs if @ssl_enable @ssl_context = OpenSSL::SSL::SSLContext.new @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert)) @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase) if @ssl_verify @cert_store = OpenSSL::X509::Store.new # Load the system default certificate path to the store @cert_store.set_default_paths if File.directory?(@ssl_cacert) @cert_store.add_path(@ssl_cacert) else @cert_store.add_file(@ssl_cacert) end @ssl_context.cert_store = @cert_store @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end end # @ssl_enable if server? @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}") begin @server_socket = TCPServer.new(@host, @port) rescue Errno::EADDRINUSE @logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port) raise end if @ssl_enable @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context) end # @ssl_enable end end |
#run(output_queue) ⇒ Object
165 166 167 168 169 170 171 |
# File 'lib/logstash/inputs/tcp.rb', line 165 def run(output_queue) if server? run_server(output_queue) else run_client(output_queue) end end |
#run_client(output_queue) ⇒ Object
def run_server
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/logstash/inputs/tcp.rb', line 209 def run_client(output_queue) @thread = Thread.current while true client_socket = TCPSocket.new(@host, @port) if @ssl_enable client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context) begin client_socket.connect rescue OpenSSL::SSL::SSLError => ssle @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) # NOTE(mrichar1): Hack to prevent hammering peer sleep(5) next end end @logger.debug("Opened connection", :client => "#{client_socket.peer}") handle_socket(client_socket, client_socket.peeraddr[3], output_queue, @codec.clone) end # loop ensure client_socket.close rescue nil end |
#run_server(output_queue) ⇒ Object
def run
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/logstash/inputs/tcp.rb', line 173 def run_server(output_queue) @thread = Thread.current @client_threads = [] @client_threads_lock = Mutex.new while true begin socket = @server_socket.accept # start a new thread for each connection. @client_threads_lock.synchronize{@client_threads << client_thread(output_queue, socket)} rescue OpenSSL::SSL::SSLError => ssle # NOTE(mrichar1): This doesn't return a useful error message for some reason @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) rescue IOError, LogStash::ShutdownSignal if @interrupted @server_socket.close rescue nil threads = @client_threads_lock.synchronize{@client_threads.dup} threads.each do |thread| thread.raise(LogStash::ShutdownSignal) if thread.alive? end # intended shutdown, get out of the loop break else # it was a genuine IOError, propagate it up raise end end end # loop rescue LogStash::ShutdownSignal # nothing to do ensure @server_socket.close rescue nil end |
#teardown ⇒ Object
232 233 234 235 236 |
# File 'lib/logstash/inputs/tcp.rb', line 232 def teardown if server? @interrupted = true end end |