Class: Kafka::Datadog::ConsumerSubscriber

Inherits:
StatsdSubscriber
  • Object
show all
Defined in:
lib/kafka/datadog.rb

Instance Method Summary collapse

Instance Method Details

#fetch_batch(event) ⇒ Object



196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/kafka/datadog.rb', line 196

def fetch_batch(event)
  lag = event.payload.fetch(:offset_lag)
  batch_size = event.payload.fetch(:message_count)

  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition),
  }

  histogram("consumer.batch_size", batch_size, tags: tags)
  gauge("consumer.lag", lag, tags: tags)
end

#join_group(event) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/kafka/datadog.rb', line 211

def join_group(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
  }

  timing("consumer.join_group", event.duration, tags: tags)

  if event.payload.key?(:exception)
    increment("consumer.join_group.errors", tags: tags)
  end
end

#leave_group(event) ⇒ Object



237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/kafka/datadog.rb', line 237

def leave_group(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
  }

  timing("consumer.leave_group", event.duration, tags: tags)

  if event.payload.key?(:exception)
    increment("consumer.leave_group.errors", tags: tags)
  end
end

#loop(event) ⇒ Object



250
251
252
253
254
255
256
257
# File 'lib/kafka/datadog.rb', line 250

def loop(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
  }

  histogram("consumer.loop.duration", event.duration, tags: tags)
end

#pause_status(event) ⇒ Object



259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/kafka/datadog.rb', line 259

def pause_status(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition),
  }

  duration = event.payload.fetch(:duration)

  gauge("consumer.pause.duration", duration, tags: tags)
end

#process_batch(event) ⇒ Object



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/kafka/datadog.rb', line 169

def process_batch(event)
  offset = event.payload.fetch(:last_offset)
  messages = event.payload.fetch(:message_count)
  create_time = event.payload.fetch(:last_create_time)
  time_lag = create_time && ((Time.now - create_time) * 1000).to_i

  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition),
  }

  if event.payload.key?(:exception)
    increment("consumer.process_batch.errors", tags: tags)
  else
    timing("consumer.process_batch.latency", event.duration, tags: tags)
    count("consumer.messages", messages, tags: tags)
  end

  gauge("consumer.offset", offset, tags: tags)

  if time_lag
    gauge("consumer.time_lag", time_lag, tags: tags)
  end
end

#process_message(event) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/kafka/datadog.rb', line 140

def process_message(event)
  offset = event.payload.fetch(:offset)
  offset_lag = event.payload.fetch(:offset_lag)
  create_time = event.payload.fetch(:create_time)
  time_lag = create_time && ((Time.now - create_time) * 1000).to_i

  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition),
  }

  if event.payload.key?(:exception)
    increment("consumer.process_message.errors", tags: tags)
  else
    timing("consumer.process_message.latency", event.duration, tags: tags)
    increment("consumer.messages", tags: tags)
  end

  gauge("consumer.offset", offset, tags: tags)
  gauge("consumer.lag", offset_lag, tags: tags)

  # Not all messages have timestamps.
  if time_lag
    gauge("consumer.time_lag", time_lag, tags: tags)
  end
end

#sync_group(event) ⇒ Object



224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/kafka/datadog.rb', line 224

def sync_group(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
  }

  timing("consumer.sync_group", event.duration, tags: tags)

  if event.payload.key?(:exception)
    increment("consumer.sync_group.errors", tags: tags)
  end
end