Class: LogStash::Inputs::Tcp

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/tcp.rb

Overview

Read events over a TCP socket.

Like stdin and file inputs, each event is assumed to be one line of text.

Can either accept connections from clients or connect to a server, depending on ‘mode`.

#### Accepting log4j2 logs

Log4j2 can send JSON over a socket, and we can use that combined with our tcp input to accept the logs.

First, we need to configure your application to send logs in JSON over a socket. The following log4j2.xml accomplishes this task.

Note, you will want to change the ‘host` and `port` settings in this configuration to match your needs.

<Configuration>
  <Appenders>
     <Socket name="Socket" host="localhost" port="12345">
       <JsonLayout compact="true" eventEol="true" />
    </Socket>
  </Appenders>
  <Loggers>
    <Root level="info">
      <AppenderRef ref="Socket"/>
    </Root>
  </Loggers>
</Configuration>

To accept this in Logstash, you will want tcp input and a date filter:

input {
  tcp {
    port => 12345
    codec => json
  }
}

and add a date filter to take log4j2’s ‘timeMillis` field and use it as the event timestamp

filter {
  date {
    match => [ "timeMillis", "UNIX_MS" ]
  }
}

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Tcp



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/logstash/inputs/tcp.rb', line 159

def initialize(*args)
  super(*args)

  setup_fields!

  self.class.patch_socket_peer!

  # threadsafe socket bookkeeping
  @server_socket = nil
  @client_socket = nil
  @connection_sockets = {}
  @socket_mutex = Mutex.new

  @ssl_context = nil
end

Class Method Details

.patch_socket_peer!Object

Monkey patch TCPSocket and SSLSocket to include socket peer



150
151
152
153
154
155
156
157
# File 'lib/logstash/inputs/tcp.rb', line 150

def self.patch_socket_peer!
  unless TCPSocket < ::LogStash::Util::SocketPeer
    TCPSocket.send :include, ::LogStash::Util::SocketPeer
  end
  unless OpenSSL::SSL::SSLSocket < ::LogStash::Util::SocketPeer
    OpenSSL::SSL::SSLSocket.send :include, ::LogStash::Util::SocketPeer
  end
end

Instance Method Details

#closeObject



204
205
206
207
208
209
210
211
# File 'lib/logstash/inputs/tcp.rb', line 204

def close
  # see related comment in register: we must make sure to close the server socket here
  # because it is created in the register method and we could be in the context of having
  # register called but never run & stop, only close.
  # catch all rescue nil on close to discard any close errors or invalid socket
  server_socket.close rescue nil
  @loop.close rescue nil
end

#decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address, proxy_port, tbuf, ssl_subject) ⇒ Object



213
214
215
216
217
218
219
220
221
222
# File 'lib/logstash/inputs/tcp.rb', line 213

def decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address,
                  proxy_port, tbuf, ssl_subject)
  codec.decode(tbuf) do |event|
    if @proxy_protocol
      event.set(@field_proxy_host, proxy_address) unless event.get(@field_proxy_host)
      event.set(@field_proxy_port, proxy_port) unless event.get(@field_proxy_port)
    end
    enqueue_decorated(event, client_ip_address, client_address, client_port, ssl_subject)
  end
end

#dns_reverse_lookup_enabled?Boolean



230
231
232
# File 'lib/logstash/inputs/tcp.rb', line 230

def dns_reverse_lookup_enabled?
  @dns_reverse_lookup_enabled
end

#flush_codec(codec, client_ip_address, client_address, client_port, ssl_subject) ⇒ Object



224
225
226
227
228
# File 'lib/logstash/inputs/tcp.rb', line 224

def flush_codec(codec, client_ip_address, client_address, client_port, ssl_subject)
  codec.flush do |event|
    enqueue_decorated(event, client_ip_address, client_address, client_port, ssl_subject)
  end
end

#registerObject



175
176
177
178
179
180
181
182
# File 'lib/logstash/inputs/tcp.rb', line 175

def register
  fix_streaming_codecs
  validate_ssl_config!

  if server?
    @loop = InputLoop.new(@id, @host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
  end
end

#run(output_queue) ⇒ Object



184
185
186
187
188
189
190
191
192
# File 'lib/logstash/inputs/tcp.rb', line 184

def run(output_queue)
  @output_queue = output_queue
  if server?
    @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}", :ssl_enabled => @ssl_enabled)
    @loop.run
  else
    run_client()
  end
end

#ssl_peer_verification_enabled?Boolean



234
235
236
237
238
239
240
241
# File 'lib/logstash/inputs/tcp.rb', line 234

def ssl_peer_verification_enabled?
  return false unless @ssl_enabled
  if server?
    @ssl_client_authentication && @ssl_client_authentication != 'none'
  else
    @ssl_verification_mode == 'full'
  end
end

#stopObject



194
195
196
197
198
199
200
201
202
# File 'lib/logstash/inputs/tcp.rb', line 194

def stop
  # force close all sockets which will escape any blocking read with a IO exception
  # and any thread using them will exit.
  # catch all rescue nil on close to discard any close errors or invalid socket
  server_socket.close rescue nil
  @loop.close rescue nil
  client_socket.close rescue nil
  connection_sockets.each{|socket| socket.close rescue nil}
end