Class: Syslogstash::SyslogReader

Inherits:
Object
  • Object
show all
Includes:
Worker
Defined in:
lib/syslogstash/syslog_reader.rb

Overview

A single socket reader.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Worker

#stop, #thread, #wait

Constructor Details

#initialize(file, config, logstash, metrics) ⇒ SyslogReader

Returns a new instance of SyslogReader.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/syslogstash/syslog_reader.rb', line 10

def initialize(file, config, logstash, metrics)
	@file, @logstash, @metrics = file, logstash, metrics
	config ||= {}

	@add_fields = config['add_fields'] || {}
	@relay_to   = config['relay_to']   || []

	unless @add_fields.is_a? Hash
		raise ArgumentError,
		      "add_fields parameter to socket #{file} must be a hash"
	end

	unless @relay_to.is_a? Array
		raise ArgumentError,
		      "relay_to parameter to socket #{file} must be an array"
	end

	log { "initialized syslog socket #{file} with config #{config.inspect}" }
end

Instance Attribute Details

#fileObject (readonly)

Returns the value of attribute file.



8
9
10
# File 'lib/syslogstash/syslog_reader.rb', line 8

def file
  @file
end

Instance Method Details

#runObject

Start reading from the socket file, parsing entries, and flinging them at logstash. This method will return, with the operation continuing in a separate thread.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/syslogstash/syslog_reader.rb', line 34

def run
	debug { "#run called" }

	begin
		socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM, 0)
		socket.bind(Socket.pack_sockaddr_un(@file))
		File.chmod(0666, @file)
	rescue Errno::EEXIST, Errno::EADDRINUSE
		log { "socket file #{@file} already exists; deleting" }
		File.unlink(@file) rescue nil
		retry
	rescue SystemCallError
		log { "Error while trying to bind to #{@file}" }
		raise
	end

	@worker = Thread.new do
		begin
			loop do
				msg = socket.recvmsg
				debug { "Message received: #{msg.inspect}" }
				@metrics.received(@file, Time.now)
				process_message msg.first.chomp
				relay_message msg.first
			end
		ensure
			socket.close
			log { "removing socket file #{@file}" }
			File.unlink(@file) rescue nil
		end
	end
end