Class: Racecar::Datadog::ProducerSubscriber
- Inherits:
-
StatsdSubscriber
- Object
- ActiveSupport::Subscriber
- StatsdSubscriber
- Racecar::Datadog::ProducerSubscriber
- Defined in:
- lib/racecar/datadog.rb
Instance Method Summary collapse
- #acknowledged_message(event) ⇒ Object
- #deliver_messages(event) ⇒ Object
- #produce_async(event) ⇒ Object
- #produce_delivery_error(event) ⇒ Object
- #produce_message(event) ⇒ Object
- #produce_sync(event) ⇒ Object
Instance Method Details
#acknowledged_message(event) ⇒ Object
245 246 247 248 249 250 |
# File 'lib/racecar/datadog.rb', line 245 def (event) = { client: event.payload.fetch(:client_id) } # Number of messages ACK'd for the topic. increment("producer.ack.messages", tags: ) end |
#deliver_messages(event) ⇒ Object
231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/racecar/datadog.rb', line 231 def (event) client = event.payload.fetch(:client_id) = event.payload.fetch(:delivered_message_count) = { client: client, } timing("producer.deliver.latency", event.duration, tags: ) # Messages delivered to Kafka: count("producer.deliver.messages", , tags: ) end |
#produce_async(event) ⇒ Object
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/racecar/datadog.rb', line 260 def produce_async(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) = event.payload.fetch(:message_size) buffer_size = event.payload.fetch(:buffer_size) = { client: client, topic: topic, } if event.payload.key?(:exception) increment("producer.produce.errors", tags: ) end # This gets us the write rate. increment("producer.produce.messages", tags: .merge(topic: topic)) # Information about typical/average/95p message size. histogram("producer.produce.message_size", , tags: .merge(topic: topic)) # Aggregate message size. count("producer.produce.message_size.sum", , tags: .merge(topic: topic)) # This gets us the avg/max buffer size per producer. histogram("producer.buffer.size", buffer_size, tags: ) end |
#produce_delivery_error(event) ⇒ Object
252 253 254 255 256 257 258 |
# File 'lib/racecar/datadog.rb', line 252 def produce_delivery_error(event) = { client: event.payload.fetch(:client_id), } increment("producer.produce.delivery.errors", tags: ) end |
#produce_message(event) ⇒ Object
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/racecar/datadog.rb', line 203 def (event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) = event.payload.fetch(:message_size) buffer_size = event.payload.fetch(:buffer_size) = { client: client, topic: topic, } if event.payload.key?(:exception) increment("producer.produce.errors", tags: ) end # This gets us the write rate. increment("producer.produce.messages", tags: .merge(topic: topic)) # Information about typical/average/95p message size. histogram("producer.produce.message_size", , tags: .merge(topic: topic)) # Aggregate message size. count("producer.produce.message_size.sum", , tags: .merge(topic: topic)) # This gets us the avg/max buffer size per producer. histogram("producer.buffer.size", buffer_size, tags: ) end |
#produce_sync(event) ⇒ Object
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/racecar/datadog.rb', line 288 def produce_sync(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) = event.payload.fetch(:message_size) = { client: client, topic: topic, } if event.payload.key?(:exception) increment("producer.produce.errors", tags: ) end # This gets us the write rate. increment("producer.produce.messages", tags: .merge(topic: topic)) # Information about typical/average/95p message size. histogram("producer.produce.message_size", , tags: .merge(topic: topic)) # Aggregate message size. count("producer.produce.message_size.sum", , tags: .merge(topic: topic)) end |