Class: Kafka::Datadog::ConsumerSubscriber
- Inherits:
-
StatsdSubscriber
- Object
- ActiveSupport::Subscriber
- StatsdSubscriber
- Kafka::Datadog::ConsumerSubscriber
- Defined in:
- lib/kafka/datadog.rb
Instance Method Summary collapse
Instance Method Details
#process_batch(event) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/kafka/datadog.rb', line 111 def process_batch(event) = event.payload.fetch(:message_count) = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } if event.payload.key?(:exception) increment("consumer.process_batch.errors", tags: ) else timing("consumer.process_batch.latency", event.duration, tags: ) count("consumer.messages", , tags: ) end end |
#process_message(event) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/kafka/datadog.rb', line 91 def (event) lag = event.payload.fetch(:offset_lag) = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } if event.payload.key?(:exception) increment("consumer.process_message.errors", tags: ) else timing("consumer.process_message.latency", event.duration, tags: ) increment("consumer.messages", tags: ) end gauge("consumer.lag", lag, tags: ) end |