Class: Kafka::Prometheus::ProducerSubscriber

Inherits:
ActiveSupport::Subscriber
  • Object
show all
Defined in:
lib/kafka/prometheus.rb

Instance Method Summary collapse

Constructor Details

#initializeProducerSubscriber

Returns a new instance of ProducerSubscriber.



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/kafka/prometheus.rb', line 185

def initialize
  super
  @produce_messages = Prometheus.registry.counter(:producer_produced_messages, 'Produced messages total')
  @produce_message_size =
    Prometheus.registry.histogram(:producer_message_size, 'Message size', {}, SIZE_BUCKETS)
  @buffer_size = Prometheus.registry.histogram(:producer_buffer_size, 'Buffer size', {}, SIZE_BUCKETS)
  @buffer_fill_ratio = Prometheus.registry.histogram(:producer_buffer_fill_ratio, 'Buffer fill ratio')
  @buffer_fill_percentage = Prometheus.registry.histogram(:producer_buffer_fill_percentage, 'Buffer fill percentage')
  @produce_errors = Prometheus.registry.counter(:producer_produce_errors, 'Produce errors')
  @deliver_errors = Prometheus.registry.counter(:producer_deliver_errors, 'Deliver error')
  @deliver_latency =
    Prometheus.registry.histogram(:producer_deliver_latency, 'Delivery latency', {}, LATENCY_BUCKETS)
  @deliver_messages = Prometheus.registry.counter(:producer_deliver_messages, 'Total count of delivered messages')
  @deliver_attempts = Prometheus.registry.histogram(:producer_deliver_attempts, 'Delivery attempts')
  @ack_messages = Prometheus.registry.counter(:producer_ack_messages, 'Ack')
  @ack_delay = Prometheus.registry.histogram(:producer_ack_delay, 'Ack delay', {}, LATENCY_BUCKETS)
  @ack_errors = Prometheus.registry.counter(:producer_ack_errors, 'Ack errors')
end

Instance Method Details

#ack_message(event) ⇒ Object



246
247
248
249
250
251
252
253
254
# File 'lib/kafka/prometheus.rb', line 246

def ack_message(event)
  key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }

  # Number of messages ACK'd for the topic.
  @ack_messages.increment(key)

  # Histogram of delay between a message being produced and it being ACK'd.
  @ack_delay.observe(key, event.payload.fetch(:delay))
end

#buffer_overflow(event) ⇒ Object



226
227
228
229
# File 'lib/kafka/prometheus.rb', line 226

def buffer_overflow(event)
  key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }
  @produce_errors.increment(key)
end

#deliver_messages(event) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/kafka/prometheus.rb', line 231

def deliver_messages(event)
  key = { client: event.payload.fetch(:client_id) }
  message_count = event.payload.fetch(:delivered_message_count)
  attempts = event.payload.fetch(:attempts)

  @deliver_errors.increment(key) if event.payload.key?(:exception)
  @deliver_latency.observe(key, event.duration)

  # Messages delivered to Kafka:
  @deliver_messages.increment(key, message_count)

  # Number of attempts to deliver messages:
  @deliver_attempts.observe(key, attempts)
end

#produce_message(event) ⇒ Object



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/kafka/prometheus.rb', line 204

def produce_message(event)
  client = event.payload.fetch(:client_id)
  key = { client: client, topic: event.payload.fetch(:topic) }

  message_size = event.payload.fetch(:message_size)
  buffer_size = event.payload.fetch(:buffer_size)
  max_buffer_size = event.payload.fetch(:max_buffer_size)
  buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f
  buffer_fill_percentage = buffer_fill_ratio * 100.0

  # This gets us the write rate.
  @produce_messages.increment(key)
  @produce_message_size.observe(key, message_size)

  # This gets us the avg/max buffer size per producer.
  @buffer_size.observe({ client: client }, buffer_size)

  # This gets us the avg/max buffer fill ratio per producer.
  @buffer_fill_ratio.observe({ client: client }, buffer_fill_ratio)
  @buffer_fill_percentage.observe({ client: client }, buffer_fill_percentage)
end

#topic_error(event) ⇒ Object



256
257
258
259
260
# File 'lib/kafka/prometheus.rb', line 256

def topic_error(event)
  key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }

  @ack_errors.increment(key)
end