Class: Kafka::Prometheus::ConnectionSubscriber

Inherits:
ActiveSupport::Subscriber
  • Object
show all
Defined in:
lib/kafka/prometheus.rb

Instance Method Summary collapse

Constructor Details

#initializeConnectionSubscriber

Returns a new instance of ConnectionSubscriber


43
44
45
46
47
48
49
50
# File 'lib/kafka/prometheus.rb', line 43

def initialize
  super
  @api_calls = Prometheus.registry.counter(:api_calls, 'Total calls')
  @api_latency = Prometheus.registry.histogram(:api_latency, 'Latency', {}, LATENCY_BUCKETS)
  @api_request_size = Prometheus.registry.histogram(:api_request_size, 'Request size', {}, SIZE_BUCKETS)
  @api_response_size = Prometheus.registry.histogram(:api_response_size, 'Response size', {}, SIZE_BUCKETS)
  @api_errors = Prometheus.registry.counter(:api_errors, 'Errors')
end

Instance Method Details

#request(event) ⇒ Object


52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/kafka/prometheus.rb', line 52

def request(event)
  key = {
    client: event.payload.fetch(:client_id),
    api: event.payload.fetch(:api, 'unknown'),
    broker: event.payload.fetch(:broker_host)
  }
  request_size = event.payload.fetch(:request_size, 0)
  response_size = event.payload.fetch(:response_size, 0)

  @api_calls.increment(key)
  @api_latency.observe(key, event.duration)
  @api_request_size.observe(key, request_size)
  @api_response_size.observe(key, response_size)
  @api_errors.increment(key) if event.payload.key?(:exception)
end