Class: Syslogstash::SyslogReader

Inherits:
Object
  • Object
show all
Includes:
ServiceSkeleton::BackgroundWorker
Defined in:
lib/syslogstash/syslog_reader.rb

Overview

A single socket reader.

Instance Method Summary collapse

Constructor Details

#initialize(config, logstash, metrics) ⇒ SyslogReader

Returns a new instance of SyslogReader.



6
7
8
9
10
11
12
13
14
# File 'lib/syslogstash/syslog_reader.rb', line 6

def initialize(config, logstash, metrics)
  @config, @logstash, @metrics = config, logstash, metrics

  @logger = config.logger

  @shutdown_reader, @shutdown_writer = IO.pipe

  super
end

Instance Method Details

#shutdownObject



63
64
65
# File 'lib/syslogstash/syslog_reader.rb', line 63

def shutdown
  @shutdown_writer.close
end

#startObject

Start reading from the socket file, parsing entries, and flinging them at logstash.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/syslogstash/syslog_reader.rb', line 19

def start
  config.logger.debug(logloc) { "off we go!" }

  begin
    socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0)
    socket.bind(Socket.pack_sockaddr_un(config.syslog_socket))
    File.chmod(0666, config.syslog_socket)
  rescue Errno::EEXIST, Errno::EADDRINUSE
    config.logger.info(logloc) { "socket file #{config.syslog_socket} already exists; deleting" }
    File.unlink(config.syslog_socket) rescue nil
    retry
  rescue StandardError => ex
    raise ex.class, "Error while trying to bind to #{config.syslog_socket}: #{ex.message}", ex.backtrace
  end

  begin
    loop do
      IO.select([@shutdown_reader, socket]).first.each do |fd|
        if fd == socket
          begin
            msg = socket.recvmsg_nonblock
          rescue IO::WaitWritable
            config.logger.debug(logloc) { "select said a message was waiting, but it wasn't.  o.O" }
          else
            config.logger.debug(logloc) { "Message received: #{msg.inspect}" }
            @metrics.messages_received_total.increment(socket_path: config.syslog_socket)
            @metrics.queue_size.increment({})
            relay_message msg.first
            process_message msg.first.chomp
          end
        elsif fd == @shutdown_reader
          @shutdown_reader.close
          config.logger.debug(logloc) { "Tripped over shutdown reader" }
          break
        end
      end
    end
  ensure
    socket.close
    config.logger.debug(logloc) { "removing socket file #{config.syslog_socket}" }
    File.unlink(config.syslog_socket) rescue nil
  end
end