Class: Kafka::Prometheus::ConsumerSubscriber

Inherits:
ActiveSupport::Subscriber
  • Object
show all
Defined in:
lib/kafka/prometheus.rb

Instance Method Summary collapse

Constructor Details

#initializeConsumerSubscriber

Returns a new instance of ConsumerSubscriber


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/kafka/prometheus.rb', line 70

def initialize
  super
  @process_messages = Prometheus.registry.counter(:consumer_process_messages, 'Total messages')
  @process_message_errors = Prometheus.registry.counter(:consumer_process_message_errors, 'Total errors')
  @process_message_latency =
    Prometheus.registry.histogram(:consumer_process_message_latency, 'Latency', {}, LATENCY_BUCKETS)
  @offset_lag = Prometheus.registry.gauge(:consumer_offset_lag, 'Offset lag')
  @time_lag = Prometheus.registry.gauge(:consumer_time_lag, 'Time lag of message')
  @process_batch_errors = Prometheus.registry.counter(:consumer_process_batch_errors, 'Total errors in batch')
  @process_batch_latency =
    Prometheus.registry.histogram(:consumer_process_batch_latency, 'Latency in batch', {}, LATENCY_BUCKETS)
  @batch_size = Prometheus.registry.histogram(:consumer_batch_size, 'Size of batch', {}, SIZE_BUCKETS)
  @join_group = Prometheus.registry.histogram(:consumer_join_group, 'Time to join group', {}, DELAY_BUCKETS)
  @join_group_errors = Prometheus.registry.counter(:consumer_join_group_errors, 'Total error in joining group')
  @sync_group = Prometheus.registry.histogram(:consumer_sync_group, 'Time to sync group', {}, DELAY_BUCKETS)
  @sync_group_errors = Prometheus.registry.counter(:consumer_sync_group_errors, 'Total error in syncing group')
  @leave_group = Prometheus.registry.histogram(:consumer_leave_group, 'Time to leave group', {}, DELAY_BUCKETS)
  @leave_group_errors = Prometheus.registry.counter(:consumer_leave_group_errors, 'Total error in leaving group')
  @pause_duration = Prometheus.registry.gauge(:consumer_pause_duration, 'Pause duration')
end

Instance Method Details

#fetch_batch(event) ⇒ Object


136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/kafka/prometheus.rb', line 136

def fetch_batch(event)
  key = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition)
  }
  offset_lag = event.payload.fetch(:offset_lag)
  batch_size = event.payload.fetch(:message_count)

  @batch_size.observe(key, batch_size)
  @offset_lag.set(key, offset_lag)
end

#join_group(event) ⇒ Object


150
151
152
153
154
155
# File 'lib/kafka/prometheus.rb', line 150

def join_group(event)
  key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) }
  @join_group.observe(key, event.duration)

  @join_group_errors.increment(key) if event.payload.key?(:exception)
end

#leave_group(event) ⇒ Object


164
165
166
167
168
169
# File 'lib/kafka/prometheus.rb', line 164

def leave_group(event)
  key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) }
  @leave_group.observe(key, event.duration)

  @leave_group_errors.increment(key) if event.payload.key?(:exception)
end

#pause_status(event) ⇒ Object


171
172
173
174
175
176
177
178
179
180
181
# File 'lib/kafka/prometheus.rb', line 171

def pause_status(event)
  key = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition)
  }

  duration = event.payload.fetch(:duration)
  @pause_duration.set(key, duration)
end

#process_batch(event) ⇒ Object


119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/kafka/prometheus.rb', line 119

def process_batch(event)
  key = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition)
  }
  message_count = event.payload.fetch(:message_count)

  if event.payload.key?(:exception)
    @process_batch_errors.increment(key)
  else
    @process_batch_latency.observe(key, event.duration)
    @process_messages.increment(key, message_count)
  end
end

#process_message(event) ⇒ Object


91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/kafka/prometheus.rb', line 91

def process_message(event)
  key = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition)
  }

  offset_lag = event.payload.fetch(:offset_lag)
  create_time = event.payload.fetch(:create_time)

  time_lag = create_time && ((Time.now - create_time) * 1000).to_i

  if event.payload.key?(:exception)
    @process_message_errors.increment(key)
  else
    @process_message_latency.observe(key, event.duration)
    @process_messages.increment(key)
  end

  @offset_lag.set(key, offset_lag)

  # Not all messages have timestamps.
  return unless time_lag

  @time_lag.set(key, time_lag)
end

#sync_group(event) ⇒ Object


157
158
159
160
161
162
# File 'lib/kafka/prometheus.rb', line 157

def sync_group(event)
  key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) }
  @sync_group.observe(key, event.duration)

  @sync_group_errors.increment(key) if event.payload.key?(:exception)
end