Class: LogStash::Inputs::Tcp

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/tcp.rb

Overview

Read events over a TCP socket.

Like stdin and file inputs, each event is assumed to be one line of text.

Can either accept connections from clients or connect to a server, depending on ‘mode`.

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Tcp

Returns a new instance of Tcp.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/logstash/inputs/tcp.rb', line 56

def initialize(*args)
  super(*args)

  # monkey patch TCPSocket and SSLSocket to include socket peer
  TCPSocket.module_eval{include ::LogStash::Util::SocketPeer}
  OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer}

  # threadsafe socket bookkeeping
  @server_socket = nil
  @client_socket = nil
  @connection_sockets = {}
  @socket_mutex = Mutex.new

  @ssl_context = nil
end

Instance Method Details

#closeObject



99
100
101
102
103
104
105
# File 'lib/logstash/inputs/tcp.rb', line 99

def close
  # see related comment in register: we must make sure to close the server socket here
  # because it is created in the register method and we could be in the context of having
  # register called but never run & stop, only close.
  # catch all rescue nil on close to discard any close errors or invalid socket
  server_socket.close rescue nil
end

#registerObject



72
73
74
75
76
77
78
79
80
# File 'lib/logstash/inputs/tcp.rb', line 72

def register
  fix_streaming_codecs

  # note that since we are opening a socket in register, we must also make sure we close it
  # in the close method even if we also close it in the stop method since we could have
  # a situation where register is called but not run & stop.

  self.server_socket = new_server_socket if server?
end

#run(output_queue) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/logstash/inputs/tcp.rb', line 82

def run(output_queue)
  if server?
    run_server(output_queue)
  else
    run_client(output_queue)
  end
end

#stopObject



90
91
92
93
94
95
96
97
# File 'lib/logstash/inputs/tcp.rb', line 90

def stop
  # force close all sockets which will escape any blocking read with a IO exception
  # and any thread using them will exit.
  # catch all rescue nil on close to discard any close errors or invalid socket
  server_socket.close rescue nil
  client_socket.close rescue nil
  connection_sockets.each{|socket| socket.close rescue nil}
end