Class: Kafka::Statsd::ProducerSubscriber
- Inherits:
-
StatsdSubscriber
- Object
- ActiveSupport::Subscriber
- StatsdSubscriber
- Kafka::Statsd::ProducerSubscriber
- Defined in:
- lib/kafka/statsd.rb
Instance Method Summary collapse
- #ack_message(event) ⇒ Object
- #buffer_overflow(event) ⇒ Object
- #deliver_messages(event) ⇒ Object
- #produce_message(event) ⇒ Object
- #topic_error(event) ⇒ Object
Instance Method Details
#ack_message(event) ⇒ Object
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/kafka/statsd.rb', line 163 def (event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) # Number of messages ACK'd for the topic. increment("producer.#{client}.#{topic}.ack.messages") # Histogram of delay between a message being produced and it being ACK'd. timing("producer.#{client}.#{topic}.ack.delay", event.payload.fetch(:delay)) end |
#buffer_overflow(event) ⇒ Object
138 139 140 141 142 143 |
# File 'lib/kafka/statsd.rb', line 138 def buffer_overflow(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) increment("producer.#{client}.#{topic}.produce.errors") end |
#deliver_messages(event) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/kafka/statsd.rb', line 145 def (event) client = event.payload.fetch(:client_id) = event.payload.fetch(:delivered_message_count) attempts = event.payload.fetch(:attempts) if event.payload.key?(:exception) increment("producer.#{client}.deliver.errors") end timing("producer.#{client}.deliver.latency", event.duration) # Messages delivered to Kafka: count("producer.#{client}.deliver.messages", ) # Number of attempts to deliver messages: timing("producer.#{client}.deliver.attempts", attempts) end |
#produce_message(event) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/kafka/statsd.rb', line 118 def (event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) = event.payload.fetch(:message_size) buffer_size = event.payload.fetch(:buffer_size) max_buffer_size = event.payload.fetch(:max_buffer_size) buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f # This gets us the write rate. increment("producer.#{client}.#{topic}.produce.messages") timing("producer.#{client}.#{topic}.produce.message_size", ) # This gets us the avg/max buffer size per producer. timing("producer.#{client}.buffer.size", buffer_size) # This gets us the avg/max buffer fill ratio per producer. timing("producer.#{client}.buffer.fill_ratio", buffer_fill_ratio) end |
#topic_error(event) ⇒ Object
174 175 176 177 178 179 |
# File 'lib/kafka/statsd.rb', line 174 def topic_error(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) increment("producer.#{client}.#{topic}.ack.errors") end |