Class: Kafka::Statsd::ProducerSubscriber
- Inherits:
-
StatsdSubscriber
- Object
- ActiveSupport::Subscriber
- StatsdSubscriber
- Kafka::Statsd::ProducerSubscriber
- Defined in:
- lib/kafka/statsd.rb
Instance Method Summary collapse
- #ack_message(event) ⇒ Object
- #buffer_overflow(event) ⇒ Object
- #deliver_messages(event) ⇒ Object
- #produce_message(event) ⇒ Object
- #topic_error(event) ⇒ Object
Instance Method Details
#ack_message(event) ⇒ Object
201 202 203 204 205 206 207 208 209 210 |
# File 'lib/kafka/statsd.rb', line 201 def (event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) # Number of messages ACK'd for the topic. increment("producer.#{client}.#{topic}.ack.messages") # Histogram of delay between a message being produced and it being ACK'd. timing("producer.#{client}.#{topic}.ack.delay", event.payload.fetch(:delay)) end |
#buffer_overflow(event) ⇒ Object
176 177 178 179 180 181 |
# File 'lib/kafka/statsd.rb', line 176 def buffer_overflow(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) increment("producer.#{client}.#{topic}.produce.errors") end |
#deliver_messages(event) ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/kafka/statsd.rb', line 183 def (event) client = event.payload.fetch(:client_id) = event.payload.fetch(:delivered_message_count) attempts = event.payload.fetch(:attempts) if event.payload.key?(:exception) increment("producer.#{client}.deliver.errors") end timing("producer.#{client}.deliver.latency", event.duration) # Messages delivered to Kafka: count("producer.#{client}.deliver.messages", ) # Number of attempts to deliver messages: timing("producer.#{client}.deliver.attempts", attempts) end |
#produce_message(event) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/kafka/statsd.rb', line 154 def (event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) = event.payload.fetch(:message_size) buffer_size = event.payload.fetch(:buffer_size) max_buffer_size = event.payload.fetch(:max_buffer_size) buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f buffer_fill_percentage = buffer_fill_ratio * 100.0 # This gets us the write rate. increment("producer.#{client}.#{topic}.produce.messages") timing("producer.#{client}.#{topic}.produce.message_size", ) # This gets us the avg/max buffer size per producer. timing("producer.#{client}.buffer.size", buffer_size) # This gets us the avg/max buffer fill ratio per producer. timing("producer.#{client}.buffer.fill_ratio", buffer_fill_ratio) timing("producer.#{client}.buffer.fill_percentage", buffer_fill_percentage) end |
#topic_error(event) ⇒ Object
212 213 214 215 216 217 |
# File 'lib/kafka/statsd.rb', line 212 def topic_error(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) increment("producer.#{client}.#{topic}.ack.errors") end |