Class: Racecar::Datadog::ConsumerSubscriber
- Inherits:
-
StatsdSubscriber
- Object
- ActiveSupport::Subscriber
- StatsdSubscriber
- Racecar::Datadog::ConsumerSubscriber
- Defined in:
- lib/racecar/datadog.rb
Instance Method Summary collapse
- #join_group(event) ⇒ Object
- #leave_group(event) ⇒ Object
- #main_loop(event) ⇒ Object
- #pause_status(event) ⇒ Object
- #poll_retry(event) ⇒ Object
- #process_batch(event) ⇒ Object
- #process_message(event) ⇒ Object
Instance Method Details
#join_group(event) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/racecar/datadog.rb', line 134 def join_group(event) = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), } timing("consumer.join_group", event.duration, tags: ) if event.payload.key?(:exception) increment("consumer.join_group.errors", tags: ) end end |
#leave_group(event) ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/racecar/datadog.rb', line 147 def leave_group(event) = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), } timing("consumer.leave_group", event.duration, tags: ) if event.payload.key?(:exception) increment("consumer.leave_group.errors", tags: ) end end |
#main_loop(event) ⇒ Object
169 170 171 172 173 174 175 176 |
# File 'lib/racecar/datadog.rb', line 169 def main_loop(event) = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), } histogram("consumer.loop.duration", event.duration, tags: ) end |
#pause_status(event) ⇒ Object
178 179 180 181 182 |
# File 'lib/racecar/datadog.rb', line 178 def pause_status(event) duration = event.payload.fetch(:duration) gauge("consumer.pause.duration", duration, tags: (event)) end |
#poll_retry(event) ⇒ Object
160 161 162 163 164 165 166 167 |
# File 'lib/racecar/datadog.rb', line 160 def poll_retry(event) = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), } rdkafka_error_code = event.payload.fetch(:exception).code.to_s.gsub(/\W/, '') increment("consumer.poll.rdkafka_error.#{rdkafka_error_code}", tags: ) end |
#process_batch(event) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/racecar/datadog.rb', line 112 def process_batch(event) offset = event.payload.fetch(:last_offset) = event.payload.fetch(:message_count) last_create_time = event.payload.fetch(:last_create_time) time_lag = last_create_time && ((Time.now - last_create_time) * 1000).to_i = (event) if event.payload.key?(:exception) increment("consumer.process_batch.errors", tags: ) else timing("consumer.process_batch.latency", event.duration, tags: ) count("consumer.messages", , tags: ) end histogram("consumer.batch_size", , tags: ) gauge("consumer.offset", offset, tags: ) if time_lag gauge("consumer.time_lag", time_lag, tags: ) end end |
#process_message(event) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/racecar/datadog.rb', line 91 def (event) offset = event.payload.fetch(:offset) create_time = event.payload.fetch(:create_time) time_lag = create_time && ((Time.now - create_time) * 1000).to_i = (event) if event.payload.key?(:exception) increment("consumer.process_message.errors", tags: ) else timing("consumer.process_message.latency", event.duration, tags: ) increment("consumer.messages", tags: ) end gauge("consumer.offset", offset, tags: ) # Not all messages have timestamps. if time_lag gauge("consumer.time_lag", time_lag, tags: ) end end |