Class: LogStash::Inputs::Relp
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Relp
- Defined in:
- lib/logstash/inputs/relp.rb
Overview
Read RELP events over a TCP socket.
For more information about RELP, see <www.rsyslog.com/doc/imrelp.html>
This protocol implements application-level acknowledgements to help protect against message loss.
Message acks only function as far as messages being put into the queue for filters; anything lost after that point will not be retransmitted
Defined Under Namespace
Classes: Interrupted
Instance Method Summary collapse
-
#initialize(*args) ⇒ Relp
constructor
A new instance of Relp.
- #register ⇒ Object
- #run(output_queue) ⇒ Object
-
#teardown ⇒ Object
def run.
Constructor Details
#initialize(*args) ⇒ Relp
Returns a new instance of Relp.
33 34 35 |
# File 'lib/logstash/inputs/relp.rb', line 33 def initialize(*args) super(*args) end |
Instance Method Details
#register ⇒ Object
38 39 40 41 |
# File 'lib/logstash/inputs/relp.rb', line 38 def register @logger.info("Starting relp input listener", :address => "#{@host}:#{@port}") @relp_server = RelpServer.new(@host, @port,['syslog']) end |
#run(output_queue) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/logstash/inputs/relp.rb', line 61 def run(output_queue) @thread = Thread.current loop do begin # Start a new thread for each connection. Thread.start(@relp_server.accept) do |client| rs = client[0] socket = client[1] # monkeypatch a 'peer' method onto the socket. socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } peer = socket.peer @logger.debug("Relp Connection to #{peer} created") begin relp_stream(rs,socket, output_queue, peer) rescue Relp::ConnectionClosed => e @logger.debug("Relp Connection to #{peer} Closed") rescue Relp::RelpError => e @logger.warn('Relp error: '+e.class.to_s+' '+e.) #TODO: Still not happy with this, are they all warn level? #Will this catch everything I want it to? #Relp spec says to close connection on error, ensure this is the case end end # Thread.start rescue Relp::InvalidCommand,Relp::InappropriateCommand => e @logger.warn('Relp client trying to open connection with something other than open:'+e.) rescue Relp::InsufficientCommands @logger.warn('Relp client incapable of syslog') rescue IOError, Interrupted if @interrupted # Intended shutdown, get out of the loop @relp_server.shutdown break else # Else it was a genuine IOError caused by something else, so propagate it up.. raise end end end # loop end |
#teardown ⇒ Object
def run
101 102 103 104 |
# File 'lib/logstash/inputs/relp.rb', line 101 def teardown @interrupted = true @thread.raise(Interrupted.new) end |