Class: Observability::Collector::RabbitMQ

Inherits:
Observability::Collector show all
Extended by:
Configurability, Loggability
Defined in:
lib/observability/collector/rabbitmq.rb

Overview

A collector that re-injects events over AMQP to a RabbitMQ cluster.

Constant Summary collapse

MAX_EVENT_BYTES =

The maximum size of event messages

64 * 1024
LOOP_TIMER =

The number of seconds to wait between IO loops

0.25
DEFAULT_PUBLISH_OPTIONS =

Default options for publication

{
  mandatory:  false,
  persistent: true
}

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Observability::Collector

configured_type, inherited, start

Constructor Details

#initializeRabbitMQ

Create a new UDP collector



106
107
108
109
110
111
112
113
114
115
116
# File 'lib/observability/collector/rabbitmq.rb', line 106

def initialize
  super

  @socket        = UDPSocket.new
  @amqp_session  = nil
  @amqp_channel  = Concurrent::ThreadLocalVar.new { @amqp_session.create_channel }
  @amqp_exchange = Concurrent::ThreadLocalVar.new do
    @amqp_channel.value.headers( self.class.exchange, passive: true )
  end
  @processing    = false
end

Class Method Details

.amqp_session_optionsObject

Fetch a Hash of AMQP options.



67
68
69
70
71
72
73
74
75
# File 'lib/observability/collector/rabbitmq.rb', line 67

def self::amqp_session_options
  return {
    logger: Loggability[ Observability ],
    heartbeat: self.heartbeat,
    exchange: self.exchange,
    vhost: self.vhost,
    threaded: self.threaded,
  }
end

.capabilities_list(server_info) ⇒ Object

Return a formatted list of the server’s capabilities listed in server_info.



79
80
81
82
83
# File 'lib/observability/collector/rabbitmq.rb', line 79

def self::capabilities_list( server_info )
  server_info.
    map {|name,enabled| enabled ? name : nil }.
    compact.join(', ')
end

.configured_amqp_sessionObject

Establish the connection to RabbitMQ based on the loaded configuration.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/observability/collector/rabbitmq.rb', line 87

def self::configured_amqp_session
  uri = self.broker_uri or raise "No broker_uri configured."
  options = self.amqp_session_options

  session = Bunny.new( uri, options )
  session.start

  self.log.info "Connected to %s v%s server: %s" % [
    session.server_properties['product'],
    session.server_properties['version'],
    self.capabilities_list( session.server_properties['capabilities'] ),
  ]

  return session
end

Instance Method Details

#read_next_eventObject

Read the next event from the socket



161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/observability/collector/rabbitmq.rb', line 161

def read_next_event
  self.log.debug "Reading next event."
  data = @socket.recv_nonblock( MAX_EVENT_BYTES, exception: false )

  if data == :wait_readable
    IO.select( [@socket], nil, nil, LOOP_TIMER )
    return nil
  elsif data.empty?
    return nil
  else
    self.log.info "Read %d bytes" % [ data.bytesize ]
    return JSON.parse( data )
  end
end

#startObject

Start receiving events.



124
125
126
127
128
129
130
131
# File 'lib/observability/collector/rabbitmq.rb', line 124

def start
  self.log.info "Starting up."

  @amqp_session = self.class.configured_amqp_session
  @socket.bind( self.class.host, self.class.port )

  self.start_processing
end

#start_processingObject

Start consuming incoming events and storing them.



144
145
146
147
148
149
150
151
# File 'lib/observability/collector/rabbitmq.rb', line 144

def start_processing
  @processing = true
  while @processing
    event = self.read_next_event or next
    self.log.debug "Read event: %p" % [ event ]
    self.store_event( event )
  end
end

#stopObject

Stop receiving events.



135
136
137
138
139
140
# File 'lib/observability/collector/rabbitmq.rb', line 135

def stop
  self.stop_processing

  @socket.shutdown( :SHUT_RDWR )
  @amqp_session.close
end

#stop_processingObject

Stop consuming events.



155
156
157
# File 'lib/observability/collector/rabbitmq.rb', line 155

def stop_processing
  @processing = false
end

#store_event(event) ⇒ Object

Store the specified event.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/observability/collector/rabbitmq.rb', line 178

def store_event( event )
  time    = event.delete( '@timestamp' )
  type    = event.delete( '@type' )
  version = event.delete( '@version' )

  data = JSON.generate( event )
  headers = {
    time: time,
    type: type,
    version: version,
    content_type: 'application/json',
    content_encoding: data.encoding.name,
    timestamp: Time.now.to_f,
  }

  @amqp_exchange.value.publish( data, headers )
end