Class: Syslogstash::SyslogReader
- Inherits:
-
Object
- Object
- Syslogstash::SyslogReader
- Includes:
- ServiceSkeleton::BackgroundWorker
- Defined in:
- lib/syslogstash/syslog_reader.rb
Overview
A single socket reader.
Instance Method Summary collapse
-
#initialize(config, logstash, metrics) ⇒ SyslogReader
constructor
A new instance of SyslogReader.
- #shutdown ⇒ Object
-
#start ⇒ Object
Start reading from the socket file, parsing entries, and flinging them at logstash.
Constructor Details
#initialize(config, logstash, metrics) ⇒ SyslogReader
Returns a new instance of SyslogReader.
6 7 8 9 10 11 12 13 14 |
# File 'lib/syslogstash/syslog_reader.rb', line 6 def initialize(config, logstash, metrics) @config, @logstash, @metrics = config, logstash, metrics @logger = config.logger @shutdown_reader, @shutdown_writer = IO.pipe super end |
Instance Method Details
#shutdown ⇒ Object
63 64 65 |
# File 'lib/syslogstash/syslog_reader.rb', line 63 def shutdown @shutdown_writer.close end |
#start ⇒ Object
Start reading from the socket file, parsing entries, and flinging them at logstash.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/syslogstash/syslog_reader.rb', line 19 def start config.logger.debug(logloc) { "off we go!" } begin socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0) socket.bind(Socket.pack_sockaddr_un(config.syslog_socket)) File.chmod(0666, config.syslog_socket) rescue Errno::EEXIST, Errno::EADDRINUSE config.logger.info(logloc) { "socket file #{config.syslog_socket} already exists; deleting" } File.unlink(config.syslog_socket) rescue nil retry rescue StandardError => ex raise ex.class, "Error while trying to bind to #{config.syslog_socket}: #{ex.message}", ex.backtrace end begin loop do IO.select([@shutdown_reader, socket]).first.each do |fd| if fd == socket begin msg = socket.recvmsg_nonblock rescue IO::WaitWritable config.logger.debug(logloc) { "select said a message was waiting, but it wasn't. o.O" } else config.logger.debug(logloc) { "Message received: #{msg.inspect}" } @metrics..increment(socket_path: config.syslog_socket) @metrics.queue_size.increment({}) msg.first msg.first.chomp end elsif fd == @shutdown_reader @shutdown_reader.close config.logger.debug(logloc) { "Tripped over shutdown reader" } break end end end ensure socket.close config.logger.debug(logloc) { "removing socket file #{config.syslog_socket}" } File.unlink(config.syslog_socket) rescue nil end end |