Class: LogStash::Outputs::Tcp

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/tcp.rb

Overview

Write events over a TCP socket.

Each event json is separated by a newline.

Can either accept connections from clients or connect to a server, depending on ‘mode`.

Defined Under Namespace

Classes: Client

Instance Method Summary collapse

Instance Method Details

#receive(event) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
# File 'lib/logstash/outputs/tcp.rb', line 134

def receive(event)
  return unless output?(event)

  #if @message_format
    #output = event.sprintf(@message_format) + "\n"
  #else
    #output = event.to_hash.to_json + "\n"
  #end
  
  @codec.encode(event)
end

#registerObject



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/logstash/outputs/tcp.rb', line 70

def register
  require "stud/try"
  if server?
    workers_not_supported

    @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
    @server_socket = TCPServer.new(@host, @port)
    @client_threads = []

    @accept_thread = Thread.new(@server_socket) do |server_socket|
      loop do
        client_thread = Thread.start(server_socket.accept) do |client_socket|
          client = Client.new(client_socket, @logger)
          Thread.current[:client] = client
          client.run
        end
        @client_threads << client_thread
      end
    end

    @codec.on_event do |payload|
      @client_threads.each do |client_thread|
        client_thread[:client].write(payload)
      end
      @client_threads.reject! {|t| !t.alive? }
    end
  else
    client_socket = nil
    @codec.on_event do |payload|
      begin
        client_socket = connect unless client_socket
        r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil)
        # don't expect any reads, but a readable socket might
        # mean the remote end closed, so read it and throw it away.
        # we'll get an EOFError if it happens.
        client_socket.sysread(16384) if r.any?

        # Now send the payload
        client_socket.syswrite(payload) if w.any?
      rescue => e
        @logger.warn("tcp output exception", :host => @host, :port => @port,
                     :exception => e, :backtrace => e.backtrace)
        client_socket.close rescue nil
        client_socket = nil
        sleep @reconnect_interval
        retry
      end
    end
  end
end