Class: Kafka::Datadog::ConsumerSubscriber

Inherits:
StatsdSubscriber
  • Object
show all
Defined in:
lib/kafka/datadog.rb

Instance Method Summary collapse

Instance Method Details

#process_batch(event) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/kafka/datadog.rb', line 111

def process_batch(event)
  messages = event.payload.fetch(:message_count)

  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition),
  }

  if event.payload.key?(:exception)
    increment("consumer.process_batch.errors", tags: tags)
  else
    timing("consumer.process_batch.latency", event.duration, tags: tags)
    count("consumer.messages", messages, tags: tags)
  end
end

#process_message(event) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/kafka/datadog.rb', line 91

def process_message(event)
  lag = event.payload.fetch(:offset_lag)

  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition),
  }

  if event.payload.key?(:exception)
    increment("consumer.process_message.errors", tags: tags)
  else
    timing("consumer.process_message.latency", event.duration, tags: tags)
    increment("consumer.messages", tags: tags)
  end

  gauge("consumer.lag", lag, tags: tags)
end