Class: Observability::Collector::RabbitMQ
- Inherits:
-
Observability::Collector
- Object
- Observability::Collector
- Observability::Collector::RabbitMQ
- Extended by:
- Configurability, Loggability
- Defined in:
- lib/observability/collector/rabbitmq.rb
Overview
A collector that re-injects events over AMQP to a RabbitMQ cluster.
Constant Summary collapse
- MAX_EVENT_BYTES =
The maximum size of event messages
64 * 1024
- LOOP_TIMER =
The number of seconds to wait between IO loops
0.25- DEFAULT_PUBLISH_OPTIONS =
Default options for publication
{ mandatory: false, persistent: true }
Class Method Summary collapse
-
.amqp_session_options ⇒ Object
Fetch a Hash of AMQP options.
-
.capabilities_list(server_info) ⇒ Object
Return a formatted list of the server’s capabilities listed in
server_info. -
.configured_amqp_session ⇒ Object
Establish the connection to RabbitMQ based on the loaded configuration.
Instance Method Summary collapse
-
#initialize ⇒ RabbitMQ
constructor
Create a new UDP collector.
-
#read_next_event ⇒ Object
Read the next event from the socket.
-
#start ⇒ Object
Start receiving events.
-
#start_processing ⇒ Object
Start consuming incoming events and storing them.
-
#stop ⇒ Object
Stop receiving events.
-
#stop_processing ⇒ Object
Stop consuming events.
-
#store_event(event) ⇒ Object
Store the specified
event.
Methods inherited from Observability::Collector
configured_type, inherited, start
Constructor Details
#initialize ⇒ RabbitMQ
Create a new UDP collector
106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/observability/collector/rabbitmq.rb', line 106 def initialize super @socket = UDPSocket.new @amqp_session = nil @amqp_channel = Concurrent::ThreadLocalVar.new { @amqp_session.create_channel } @amqp_exchange = Concurrent::ThreadLocalVar.new do @amqp_channel.value.headers( self.class.exchange, passive: true ) end @processing = false end |
Class Method Details
.amqp_session_options ⇒ Object
Fetch a Hash of AMQP options.
67 68 69 70 71 72 73 74 75 |
# File 'lib/observability/collector/rabbitmq.rb', line 67 def self:: return { logger: Loggability[ Observability ], heartbeat: self.heartbeat, exchange: self.exchange, vhost: self.vhost, threaded: self.threaded, } end |
.capabilities_list(server_info) ⇒ Object
Return a formatted list of the server’s capabilities listed in server_info.
79 80 81 82 83 |
# File 'lib/observability/collector/rabbitmq.rb', line 79 def self::capabilities_list( server_info ) server_info. map {|name,enabled| enabled ? name : nil }. compact.join(', ') end |
.configured_amqp_session ⇒ Object
Establish the connection to RabbitMQ based on the loaded configuration.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/observability/collector/rabbitmq.rb', line 87 def self::configured_amqp_session uri = self.broker_uri or raise "No broker_uri configured." = self. session = Bunny.new( uri, ) session.start self.log.info "Connected to %s v%s server: %s" % [ session.server_properties['product'], session.server_properties['version'], self.capabilities_list( session.server_properties['capabilities'] ), ] return session end |
Instance Method Details
#read_next_event ⇒ Object
Read the next event from the socket
161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/observability/collector/rabbitmq.rb', line 161 def read_next_event self.log.debug "Reading next event." data = @socket.recv_nonblock( MAX_EVENT_BYTES, exception: false ) if data == :wait_readable IO.select( [@socket], nil, nil, LOOP_TIMER ) return nil elsif data.empty? return nil else self.log.info "Read %d bytes" % [ data.bytesize ] return JSON.parse( data ) end end |
#start ⇒ Object
Start receiving events.
124 125 126 127 128 129 130 131 |
# File 'lib/observability/collector/rabbitmq.rb', line 124 def start self.log.info "Starting up." @amqp_session = self.class.configured_amqp_session @socket.bind( self.class.host, self.class.port ) self.start_processing end |
#start_processing ⇒ Object
Start consuming incoming events and storing them.
144 145 146 147 148 149 150 151 |
# File 'lib/observability/collector/rabbitmq.rb', line 144 def start_processing @processing = true while @processing event = self.read_next_event or next self.log.debug "Read event: %p" % [ event ] self.store_event( event ) end end |
#stop ⇒ Object
Stop receiving events.
135 136 137 138 139 140 |
# File 'lib/observability/collector/rabbitmq.rb', line 135 def stop self.stop_processing @socket.shutdown( :SHUT_RDWR ) @amqp_session.close end |
#stop_processing ⇒ Object
Stop consuming events.
155 156 157 |
# File 'lib/observability/collector/rabbitmq.rb', line 155 def stop_processing @processing = false end |
#store_event(event) ⇒ Object
Store the specified event.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/observability/collector/rabbitmq.rb', line 178 def store_event( event ) time = event.delete( '@timestamp' ) type = event.delete( '@type' ) version = event.delete( '@version' ) data = JSON.generate( event ) headers = { time: time, type: type, version: version, content_type: 'application/json', content_encoding: data.encoding.name, timestamp: Time.now.to_f, } @amqp_exchange.value.publish( data, headers ) end |