Class: Kafka::Statsd::ConsumerSubscriber
- Inherits:
-
StatsdSubscriber
- Object
- ActiveSupport::Subscriber
- StatsdSubscriber
- Kafka::Statsd::ConsumerSubscriber
- Defined in:
- lib/kafka/statsd.rb
Instance Method Summary collapse
Instance Method Details
#process_batch(event) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/kafka/statsd.rb', line 99 def process_batch(event) = event.payload.fetch(:message_count) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition) if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.errors") else timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.latency", event.duration) count("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages", ) end end |
#process_message(event) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/kafka/statsd.rb', line 82 def (event) lag = event.payload.fetch(:offset_lag) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition) if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_message.errors") else timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_message.latency", event.duration) increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages") end gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.lag", lag) end |