Class: Kafka::Statsd::ProducerSubscriber
- Inherits:
-
StatsdSubscriber
- Object
- ActiveSupport::Subscriber
- StatsdSubscriber
- Kafka::Statsd::ProducerSubscriber
- Defined in:
- lib/kafka/statsd.rb
Instance Method Summary collapse
- #ack_message(event) ⇒ Object
- #buffer_overflow(event) ⇒ Object
- #deliver_messages(event) ⇒ Object
- #produce_message(event) ⇒ Object
- #topic_error(event) ⇒ Object
Instance Method Details
#ack_message(event) ⇒ Object
209 210 211 212 213 214 215 216 217 218 |
# File 'lib/kafka/statsd.rb', line 209 def (event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) # Number of messages ACK'd for the topic. increment("producer.#{client}.#{topic}.ack.messages") # Histogram of delay between a message being produced and it being ACK'd. timing("producer.#{client}.#{topic}.ack.delay", event.payload.fetch(:delay)) end |
#buffer_overflow(event) ⇒ Object
184 185 186 187 188 189 |
# File 'lib/kafka/statsd.rb', line 184 def buffer_overflow(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) increment("producer.#{client}.#{topic}.produce.errors") end |
#deliver_messages(event) ⇒ Object
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/kafka/statsd.rb', line 191 def (event) client = event.payload.fetch(:client_id) = event.payload.fetch(:delivered_message_count) attempts = event.payload.fetch(:attempts) if event.payload.key?(:exception) increment("producer.#{client}.deliver.errors") end timing("producer.#{client}.deliver.latency", event.duration) # Messages delivered to Kafka: count("producer.#{client}.deliver.messages", ) # Number of attempts to deliver messages: timing("producer.#{client}.deliver.attempts", attempts) end |
#produce_message(event) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/kafka/statsd.rb', line 162 def (event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) = event.payload.fetch(:message_size) buffer_size = event.payload.fetch(:buffer_size) max_buffer_size = event.payload.fetch(:max_buffer_size) buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f buffer_fill_percentage = buffer_fill_ratio * 100.0 # This gets us the write rate. increment("producer.#{client}.#{topic}.produce.messages") timing("producer.#{client}.#{topic}.produce.message_size", ) # This gets us the avg/max buffer size per producer. timing("producer.#{client}.buffer.size", buffer_size) # This gets us the avg/max buffer fill ratio per producer. timing("producer.#{client}.buffer.fill_ratio", buffer_fill_ratio) timing("producer.#{client}.buffer.fill_percentage", buffer_fill_percentage) end |
#topic_error(event) ⇒ Object
220 221 222 223 224 225 |
# File 'lib/kafka/statsd.rb', line 220 def topic_error(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) increment("producer.#{client}.#{topic}.ack.errors") end |