Class: LogStash::Inputs::Relp

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/relp.rb

Overview

Read RELP events over a TCP socket.

For more information about RELP, see <www.rsyslog.com/doc/imrelp.html>

This protocol implements application-level acknowledgements to help protect against message loss.

Message acks only function as far as messages being put into the queue for filters; anything lost after that point will not be retransmitted

Defined Under Namespace

Classes: Interrupted

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Relp

Returns a new instance of Relp.



33
34
35
# File 'lib/logstash/inputs/relp.rb', line 33

def initialize(*args)
  super(*args)
end

Instance Method Details

#registerObject



38
39
40
41
# File 'lib/logstash/inputs/relp.rb', line 38

def register
  @logger.info("Starting relp input listener", :address => "#{@host}:#{@port}")
  @relp_server = RelpServer.new(@host, @port,['syslog'])
end

#run(output_queue) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/logstash/inputs/relp.rb', line 61

def run(output_queue)
  @thread = Thread.current
  loop do
    begin
      # Start a new thread for each connection.
      Thread.start(@relp_server.accept) do |client|
          rs = client[0]
          socket = client[1]
          # monkeypatch a 'peer' method onto the socket.
          socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
          peer = socket.peer
          @logger.debug("Relp Connection to #{peer} created")
        begin
          relp_stream(rs,socket, output_queue, peer)
        rescue Relp::ConnectionClosed => e
          @logger.debug("Relp Connection to #{peer} Closed")
        rescue Relp::RelpError => e
          @logger.warn('Relp error: '+e.class.to_s+' '+e.message)
          #TODO: Still not happy with this, are they all warn level?
          #Will this catch everything I want it to?
          #Relp spec says to close connection on error, ensure this is the case
        end
      end # Thread.start
    rescue Relp::InvalidCommand,Relp::InappropriateCommand => e
      @logger.warn('Relp client trying to open connection with something other than open:'+e.message)
    rescue Relp::InsufficientCommands
      @logger.warn('Relp client incapable of syslog')
    rescue IOError, Interrupted
      if @interrupted
        # Intended shutdown, get out of the loop
        @relp_server.shutdown
        break
      else
        # Else it was a genuine IOError caused by something else, so propagate it up..
        raise
      end
    end
  end # loop
end

#teardownObject

def run



101
102
103
104
# File 'lib/logstash/inputs/relp.rb', line 101

def teardown
  @interrupted = true
  @thread.raise(Interrupted.new)
end