Class: Karafka::Instrumentation::Vendors::Datadog::MetricsListener

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Configurable
Defined in:
lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb

Overview

Note:

You need to setup the ‘dogstatsd-ruby` client and assign it

Listener that can be used to subscribe to Karafka to receive stats via StatsD and/or Datadog

Defined Under Namespace

Classes: RdKafkaMetric

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ MetricsListener

Returns a new instance of MetricsListener.

Parameters:

  • block (Proc)

    configuration block



59
60
61
62
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 59

def initialize(&block)
  configure
  setup(&block) if block
end

Instance Method Details

#on_connection_listener_fetch_loop_received(event) ⇒ Object

Reports how many messages we’ve polled and how much time did we spend on it

Parameters:

  • event (Karafka::Core::Monitoring::Event)


100
101
102
103
104
105
106
107
108
109
110
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 100

def on_connection_listener_fetch_loop_received(event)
  time_taken = event[:time]
  messages_count = event[:messages_buffer].size

  consumer_group_id = event[:subscription_group].consumer_group_id

  extra_tags = ["consumer_group:#{consumer_group_id}"]

  histogram('listener.polling.time_taken', time_taken, tags: default_tags + extra_tags)
  histogram('listener.polling.messages', messages_count, tags: default_tags + extra_tags)
end

#on_consumer_consumed(event) ⇒ Object

Here we report majority of things related to processing as we have access to the consumer

Parameters:

  • event (Karafka::Core::Monitoring::Event)


115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 115

def on_consumer_consumed(event)
  consumer = event.payload[:caller]
  messages = consumer.messages
   = messages.

  tags = default_tags + consumer_tags(consumer)

  count('consumer.messages', messages.count, tags: tags)
  count('consumer.batches', 1, tags: tags)
  gauge('consumer.offset', .last_offset, tags: tags)
  histogram('consumer.consumed.time_taken', event[:time], tags: tags)
  histogram('consumer.batch_size', messages.count, tags: tags)
  histogram('consumer.processing_lag', .processing_lag, tags: tags)
  histogram('consumer.consumption_lag', .consumption_lag, tags: tags)
end

#on_consumer_revoked(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)


132
133
134
135
136
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 132

def on_consumer_revoked(event)
  tags = default_tags + consumer_tags(event.payload[:caller])

  count('consumer.revoked', 1, tags: tags)
end

#on_consumer_shutdown(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)


139
140
141
142
143
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 139

def on_consumer_shutdown(event)
  tags = default_tags + consumer_tags(event.payload[:caller])

  count('consumer.shutdown', 1, tags: tags)
end

#on_error_occurred(event) ⇒ Object

Increases the errors count by 1

Parameters:

  • event (Karafka::Core::Monitoring::Event)


87
88
89
90
91
92
93
94
95
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 87

def on_error_occurred(event)
  extra_tags = ["type:#{event[:type]}"]

  if event.payload[:caller].respond_to?(:messages)
    extra_tags += consumer_tags(event.payload[:caller])
  end

  count('error_occurred', 1, tags: default_tags + extra_tags)
end

#on_statistics_emitted(event) ⇒ Object

Hooks up to WaterDrop instrumentation for emitted statistics

Parameters:

  • event (Karafka::Core::Monitoring::Event)


73
74
75
76
77
78
79
80
81
82
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 73

def on_statistics_emitted(event)
  statistics = event[:statistics]
  consumer_group_id = event[:consumer_group_id]

  base_tags = default_tags + ["consumer_group:#{consumer_group_id}"]

  rd_kafka_metrics.each do |metric|
    report_metric(metric, statistics, base_tags)
  end
end

#on_worker_process(event) ⇒ Object

Worker related metrics

Parameters:

  • event (Karafka::Core::Monitoring::Event)


147
148
149
150
151
152
153
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 147

def on_worker_process(event)
  jq_stats = event[:jobs_queue].statistics

  gauge('worker.total_threads', Karafka::App.config.concurrency, tags: default_tags)
  histogram('worker.processing', jq_stats[:busy], tags: default_tags)
  histogram('worker.enqueued_jobs', jq_stats[:enqueued], tags: default_tags)
end

#on_worker_processed(event) ⇒ Object

We report this metric before and after processing for higher accuracy Without this, the utilization would not be fully reflected

Parameters:

  • event (Karafka::Core::Monitoring::Event)


158
159
160
161
162
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 158

def on_worker_processed(event)
  jq_stats = event[:jobs_queue].statistics

  histogram('worker.processing', jq_stats[:busy], tags: default_tags)
end

#setup(&block) ⇒ Object

Note:

We define this alias to be consistent with ‘WaterDrop#setup`

Parameters:

  • block (Proc)

    configuration block



66
67
68
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 66

def setup(&block)
  configure(&block)
end