Class: Syslogstash::SyslogReader

Inherits:
Object
  • Object
show all
Includes:
Worker
Defined in:
lib/syslogstash/syslog_reader.rb

Overview

A single socket reader.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Worker

#stop, #thread, #wait

Constructor Details

#initialize(file, config, logstash, metrics) ⇒ SyslogReader

Returns a new instance of SyslogReader.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/syslogstash/syslog_reader.rb', line 10

def initialize(file, config, logstash, metrics)
  @file, @logstash, @metrics = file, logstash, metrics
  config ||= {}

  @add_fields = config['add_fields'] || {}
  @relay_to   = config['relay_to']   || []

  unless @add_fields.is_a? Hash
    raise ArgumentError,
          "add_fields parameter to socket #{file} must be a hash"
  end

  unless @relay_to.is_a? Array
    raise ArgumentError,
          "relay_to parameter to socket #{file} must be an array"
  end

  log { "initialized syslog socket #{file} with config #{config.inspect}" }
end

Instance Attribute Details

#fileObject (readonly)

Returns the value of attribute file.



8
9
10
# File 'lib/syslogstash/syslog_reader.rb', line 8

def file
  @file
end

Instance Method Details

#runObject

Start reading from the socket file, parsing entries, and flinging them at logstash. This method will return, with the operation continuing in a separate thread.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/syslogstash/syslog_reader.rb', line 34

def run
  debug { "#run called" }

  begin
    socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0)
    socket.bind(Socket.pack_sockaddr_un(@file))
    File.chmod(0666, @file)
  rescue Errno::EEXIST, Errno::EADDRINUSE
    log { "socket file #{@file} already exists; deleting" }
    File.unlink(@file) rescue nil
    retry
  rescue SystemCallError
    log { "Error while trying to bind to #{@file}" }
    raise
  end

  @worker = Thread.new do
    begin
      loop do
        msg = socket.recvmsg
        debug { "Message received: #{msg.inspect}" }
        @metrics.received(@file, Time.now)
        process_message msg.first.chomp
        relay_message msg.first
      end
    ensure
      socket.close
      log { "removing socket file #{@file}" }
      File.unlink(@file) rescue nil
    end
  end
end