Class: Kafka::Prometheus::ConsumerSubscriber
- Inherits:
-
ActiveSupport::Subscriber
- Object
- ActiveSupport::Subscriber
- Kafka::Prometheus::ConsumerSubscriber
- Defined in:
- lib/kafka/prometheus.rb
Instance Method Summary collapse
- #fetch_batch(event) ⇒ Object
-
#initialize ⇒ ConsumerSubscriber
constructor
A new instance of ConsumerSubscriber.
- #join_group(event) ⇒ Object
- #leave_group(event) ⇒ Object
- #pause_status(event) ⇒ Object
- #process_batch(event) ⇒ Object
- #process_message(event) ⇒ Object
- #sync_group(event) ⇒ Object
Constructor Details
#initialize ⇒ ConsumerSubscriber
Returns a new instance of ConsumerSubscriber.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/kafka/prometheus.rb', line 70 def initialize super @process_messages = Prometheus.registry.counter(:consumer_process_messages, 'Total messages') @process_message_errors = Prometheus.registry.counter(:consumer_process_message_errors, 'Total errors') @process_message_latency = Prometheus.registry.histogram(:consumer_process_message_latency, 'Latency', {}, LATENCY_BUCKETS) @offset_lag = Prometheus.registry.gauge(:consumer_offset_lag, 'Offset lag') @time_lag = Prometheus.registry.gauge(:consumer_time_lag, 'Time lag of message') @process_batch_errors = Prometheus.registry.counter(:consumer_process_batch_errors, 'Total errors in batch') @process_batch_latency = Prometheus.registry.histogram(:consumer_process_batch_latency, 'Latency in batch', {}, LATENCY_BUCKETS) @batch_size = Prometheus.registry.histogram(:consumer_batch_size, 'Size of batch', {}, SIZE_BUCKETS) @join_group = Prometheus.registry.histogram(:consumer_join_group, 'Time to join group', {}, DELAY_BUCKETS) @join_group_errors = Prometheus.registry.counter(:consumer_join_group_errors, 'Total error in joining group') @sync_group = Prometheus.registry.histogram(:consumer_sync_group, 'Time to sync group', {}, DELAY_BUCKETS) @sync_group_errors = Prometheus.registry.counter(:consumer_sync_group_errors, 'Total error in syncing group') @leave_group = Prometheus.registry.histogram(:consumer_leave_group, 'Time to leave group', {}, DELAY_BUCKETS) @leave_group_errors = Prometheus.registry.counter(:consumer_leave_group_errors, 'Total error in leaving group') @pause_duration = Prometheus.registry.gauge(:consumer_pause_duration, 'Pause duration') end |
Instance Method Details
#fetch_batch(event) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/kafka/prometheus.rb', line 136 def fetch_batch(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition) } offset_lag = event.payload.fetch(:offset_lag) batch_size = event.payload.fetch(:message_count) @batch_size.observe(key, batch_size) @offset_lag.set(key, offset_lag) end |
#join_group(event) ⇒ Object
150 151 152 153 154 155 |
# File 'lib/kafka/prometheus.rb', line 150 def join_group(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) } @join_group.observe(key, event.duration) @join_group_errors.increment(key) if event.payload.key?(:exception) end |
#leave_group(event) ⇒ Object
164 165 166 167 168 169 |
# File 'lib/kafka/prometheus.rb', line 164 def leave_group(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) } @leave_group.observe(key, event.duration) @leave_group_errors.increment(key) if event.payload.key?(:exception) end |
#pause_status(event) ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/kafka/prometheus.rb', line 171 def pause_status(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition) } duration = event.payload.fetch(:duration) @pause_duration.set(key, duration) end |
#process_batch(event) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/kafka/prometheus.rb', line 119 def process_batch(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition) } = event.payload.fetch(:message_count) if event.payload.key?(:exception) @process_batch_errors.increment(key) else @process_batch_latency.observe(key, event.duration) @process_messages.increment(key, ) end end |
#process_message(event) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/kafka/prometheus.rb', line 91 def (event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition) } offset_lag = event.payload.fetch(:offset_lag) create_time = event.payload.fetch(:create_time) time_lag = create_time && ((Time.now - create_time) * 1000).to_i if event.payload.key?(:exception) @process_message_errors.increment(key) else @process_message_latency.observe(key, event.duration) @process_messages.increment(key) end @offset_lag.set(key, offset_lag) # Not all messages have timestamps. return unless time_lag @time_lag.set(key, time_lag) end |
#sync_group(event) ⇒ Object
157 158 159 160 161 162 |
# File 'lib/kafka/prometheus.rb', line 157 def sync_group(event) key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) } @sync_group.observe(key, event.duration) @sync_group_errors.increment(key) if event.payload.key?(:exception) end |