Class: Observability::Collector::RabbitMQ

Inherits:
Observability::Collector show all
Extended by:
Configurability, Loggability
Defined in:
lib/observability/collector/rabbitmq.rb

Overview

A collector that re-injects events over AMQP to a RabbitMQ cluster.

Constant Summary collapse

MAX_EVENT_BYTES =

The maximum size of event messages

64 * 1024
LOOP_TIMER =

The number of seconds to wait between IO loops

0.25
DEFAULT_PUBLISH_OPTIONS =

Default options for publication

{
	mandatory:  false,
	persistent: true
}

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Observability::Collector

configured_type, inherited, start

Constructor Details

#initializeRabbitMQ

Create a new UDP collector



106
107
108
109
110
111
112
113
114
115
116
# File 'lib/observability/collector/rabbitmq.rb', line 106

def initialize
	super

	@socket        = UDPSocket.new
	@amqp_session  = nil
	@amqp_channel  = Concurrent::ThreadLocalVar.new { @amqp_session.create_channel }
	@amqp_exchange = Concurrent::ThreadLocalVar.new do
		@amqp_channel.value.headers( self.class.exchange, passive: true )
	end
	@processing    = false
end

Class Method Details

.amqp_session_optionsObject

Fetch a Hash of AMQP options.



67
68
69
70
71
72
73
74
75
# File 'lib/observability/collector/rabbitmq.rb', line 67

def self::amqp_session_options
	return {
		logger: Loggability[ Observability ],
		heartbeat: self.heartbeat,
		exchange: self.exchange,
		vhost: self.vhost,
		threaded: self.threaded,
	}
end

.capabilities_list(server_info) ⇒ Object

Return a formatted list of the server’s capabilities listed in server_info.



79
80
81
82
83
# File 'lib/observability/collector/rabbitmq.rb', line 79

def self::capabilities_list( server_info )
	server_info.
		map {|name,enabled| enabled ? name : nil }.
		compact.join(', ')
end

.configured_amqp_sessionObject

Establish the connection to RabbitMQ based on the loaded configuration.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/observability/collector/rabbitmq.rb', line 87

def self::configured_amqp_session
	uri = self.broker_uri or raise "No broker_uri configured."
	options = self.amqp_session_options

	session = Bunny.new( uri, options )
	session.start

	self.log.info "Connected to %s v%s server: %s" % [
		session.server_properties['product'],
		session.server_properties['version'],
		self.capabilities_list( session.server_properties['capabilities'] ),
	]

	return session
end

Instance Method Details

#read_next_eventObject

Read the next event from the socket



161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/observability/collector/rabbitmq.rb', line 161

def read_next_event
	self.log.debug "Reading next event."
	data = @socket.recv_nonblock( MAX_EVENT_BYTES, exception: false )

	if data == :wait_readable
		IO.select( [@socket], nil, nil, LOOP_TIMER )
		return nil
	elsif data.empty?
		return nil
	else
		self.log.info "Read %d bytes" % [ data.bytesize ]
		return JSON.parse( data )
	end
end

#startObject

Start receiving events.



124
125
126
127
128
129
130
131
# File 'lib/observability/collector/rabbitmq.rb', line 124

def start
	self.log.info "Starting up."

	@amqp_session = self.class.configured_amqp_session
	@socket.bind( self.class.host, self.class.port )

	self.start_processing
end

#start_processingObject

Start consuming incoming events and storing them.



144
145
146
147
148
149
150
151
# File 'lib/observability/collector/rabbitmq.rb', line 144

def start_processing
	@processing = true
	while @processing
		event = self.read_next_event or next
		self.log.debug "Read event: %p" % [ event ]
		self.store_event( event )
	end
end

#stopObject

Stop receiving events.



135
136
137
138
139
140
# File 'lib/observability/collector/rabbitmq.rb', line 135

def stop
	self.stop_processing

	@socket.shutdown( :SHUT_RDWR )
	@amqp_session.close
end

#stop_processingObject

Stop consuming events.



155
156
157
# File 'lib/observability/collector/rabbitmq.rb', line 155

def stop_processing
	@processing = false
end

#store_event(event) ⇒ Object

Store the specified event.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/observability/collector/rabbitmq.rb', line 178

def store_event( event )
	time    = event.delete( '@timestamp' )
	type    = event.delete( '@type' )
	version = event.delete( '@version' )

	data = JSON.generate( event )
	headers = {
		time: time,
		type: type,
		version: version,
		content_type: 'application/json',
		content_encoding: data.encoding.name,
		timestamp: Time.now.to_f,
	}

	@amqp_exchange.value.publish( data, headers )
end