Class: LogStash::Inputs::Tcp
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Tcp
- Defined in:
- lib/logstash/inputs/tcp.rb
Overview
Read events over a TCP socket.
Like stdin and file inputs, each event is assumed to be one line of text.
Can either accept connections from clients or connect to a server, depending on ‘mode`.
#### Accepting log4j2 logs
Log4j2 can send JSON over a socket, and we can use that combined with our tcp input to accept the logs.
First, we need to configure your application to send logs in JSON over a socket. The following log4j2.xml accomplishes this task.
Note, you will want to change the ‘host` and `port` settings in this configuration to match your needs.
<Configuration>
<Appenders>
<Socket name="Socket" host="localhost" port="12345">
<JsonLayout compact="true" eventEol="true" />
</Socket>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Socket"/>
</Root>
</Loggers>
</Configuration>
To accept this in Logstash, you will want tcp input and a date filter:
input {
tcp {
port => 12345
codec => json
}
}
and add a date filter to take log4j2’s ‘timeMillis` field and use it as the event timestamp
filter {
date {
match => [ "timeMillis", "UNIX_MS" ]
}
}
Class Method Summary collapse
-
.patch_socket_peer! ⇒ Object
Monkey patch TCPSocket and SSLSocket to include socket peer.
Instance Method Summary collapse
- #close ⇒ Object
- #decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address, proxy_port, tbuf, ssl_subject) ⇒ Object
- #dns_reverse_lookup_enabled? ⇒ Boolean
- #flush_codec(codec, client_ip_address, client_address, client_port, ssl_subject) ⇒ Object
-
#initialize(*args) ⇒ Tcp
constructor
A new instance of Tcp.
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #ssl_peer_verification_enabled? ⇒ Boolean
- #stop ⇒ Object
Constructor Details
#initialize(*args) ⇒ Tcp
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/logstash/inputs/tcp.rb', line 159 def initialize(*args) super(*args) setup_fields! self.class.patch_socket_peer! # threadsafe socket bookkeeping @server_socket = nil @client_socket = nil @connection_sockets = {} @socket_mutex = Mutex.new @ssl_context = nil end |
Class Method Details
.patch_socket_peer! ⇒ Object
Monkey patch TCPSocket and SSLSocket to include socket peer
150 151 152 153 154 155 156 157 |
# File 'lib/logstash/inputs/tcp.rb', line 150 def self.patch_socket_peer! unless TCPSocket < ::LogStash::Util::SocketPeer TCPSocket.send :include, ::LogStash::Util::SocketPeer end unless OpenSSL::SSL::SSLSocket < ::LogStash::Util::SocketPeer OpenSSL::SSL::SSLSocket.send :include, ::LogStash::Util::SocketPeer end end |
Instance Method Details
#close ⇒ Object
204 205 206 207 208 209 210 211 |
# File 'lib/logstash/inputs/tcp.rb', line 204 def close # see related comment in register: we must make sure to close the server socket here # because it is created in the register method and we could be in the context of having # register called but never run & stop, only close. # catch all rescue nil on close to discard any close errors or invalid socket server_socket.close rescue nil @loop.close rescue nil end |
#decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address, proxy_port, tbuf, ssl_subject) ⇒ Object
213 214 215 216 217 218 219 220 221 222 |
# File 'lib/logstash/inputs/tcp.rb', line 213 def decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address, proxy_port, tbuf, ssl_subject) codec.decode(tbuf) do |event| if @proxy_protocol event.set(@field_proxy_host, proxy_address) unless event.get(@field_proxy_host) event.set(@field_proxy_port, proxy_port) unless event.get(@field_proxy_port) end enqueue_decorated(event, client_ip_address, client_address, client_port, ssl_subject) end end |
#dns_reverse_lookup_enabled? ⇒ Boolean
230 231 232 |
# File 'lib/logstash/inputs/tcp.rb', line 230 def dns_reverse_lookup_enabled? @dns_reverse_lookup_enabled end |
#flush_codec(codec, client_ip_address, client_address, client_port, ssl_subject) ⇒ Object
224 225 226 227 228 |
# File 'lib/logstash/inputs/tcp.rb', line 224 def flush_codec(codec, client_ip_address, client_address, client_port, ssl_subject) codec.flush do |event| enqueue_decorated(event, client_ip_address, client_address, client_port, ssl_subject) end end |
#register ⇒ Object
175 176 177 178 179 180 181 182 |
# File 'lib/logstash/inputs/tcp.rb', line 175 def register fix_streaming_codecs validate_ssl_config! if server? @loop = InputLoop.new(@id, @host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context) end end |
#run(output_queue) ⇒ Object
184 185 186 187 188 189 190 191 192 |
# File 'lib/logstash/inputs/tcp.rb', line 184 def run(output_queue) @output_queue = output_queue if server? @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}", :ssl_enabled => @ssl_enabled) @loop.run else run_client() end end |
#ssl_peer_verification_enabled? ⇒ Boolean
234 235 236 237 238 239 240 241 |
# File 'lib/logstash/inputs/tcp.rb', line 234 def ssl_peer_verification_enabled? return false unless @ssl_enabled if server? @ssl_client_authentication && @ssl_client_authentication != 'none' else @ssl_verification_mode == 'full' end end |
#stop ⇒ Object
194 195 196 197 198 199 200 201 202 |
# File 'lib/logstash/inputs/tcp.rb', line 194 def stop # force close all sockets which will escape any blocking read with a IO exception # and any thread using them will exit. # catch all rescue nil on close to discard any close errors or invalid socket server_socket.close rescue nil @loop.close rescue nil client_socket.close rescue nil connection_sockets.each{|socket| socket.close rescue nil} end |