Class: Kafka::Prometheus::AsyncProducerSubscriber

Inherits:
ActiveSupport::Subscriber
  • Object
show all
Defined in:
lib/kafka/prometheus.rb

Instance Method Summary collapse

Constructor Details

#initializeAsyncProducerSubscriber

Returns a new instance of AsyncProducerSubscriber


264
265
266
267
268
269
270
# File 'lib/kafka/prometheus.rb', line 264

def initialize
  super
  @queue_size = Prometheus.registry.histogram(:async_producer_queue_size, 'Queue size', {}, SIZE_BUCKETS)
  @queue_fill_ratio = Prometheus.registry.histogram(:async_producer_queue_fill_ratio, 'Queue fill ratio')
  @produce_errors = Prometheus.registry.counter(:async_producer_produce_errors, 'Producer errors')
  @dropped_messages = Prometheus.registry.counter(:async_producer_dropped_messages, 'Dropped messages')
end

Instance Method Details

#buffer_overflow(event) ⇒ Object


286
287
288
289
# File 'lib/kafka/prometheus.rb', line 286

def buffer_overflow(event)
  key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }
  @produce_errors.increment(key)
end

#drop_messages(event) ⇒ Object


291
292
293
294
295
296
# File 'lib/kafka/prometheus.rb', line 291

def drop_messages(event)
  key = { client: event.payload.fetch(:client_id) }
  message_count = event.payload.fetch(:message_count)

  @dropped_messages.increment(key, message_count)
end

#enqueue_message(event) ⇒ Object


272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/kafka/prometheus.rb', line 272

def enqueue_message(event)
  key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) }

  queue_size = event.payload.fetch(:queue_size)
  max_queue_size = event.payload.fetch(:max_queue_size)
  queue_fill_ratio = queue_size.to_f / max_queue_size.to_f

  # This gets us the avg/max queue size per producer.
  @queue_size.observe(key, queue_size)

  # This gets us the avg/max queue fill ratio per producer.
  @queue_fill_ratio.observe(key, queue_fill_ratio)
end