Class: Kafka::Statsd::ProducerSubscriber

Inherits:
StatsdSubscriber
  • Object
show all
Defined in:
lib/kafka/statsd.rb

Instance Method Summary collapse

Instance Method Details

#ack_message(event) ⇒ Object



231
232
233
234
235
236
237
238
239
240
# File 'lib/kafka/statsd.rb', line 231

def ack_message(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  # Number of messages ACK'd for the topic.
  increment("producer.#{client}.#{topic}.ack.messages")

  # Histogram of delay between a message being produced and it being ACK'd.
  timing("producer.#{client}.#{topic}.ack.delay", event.payload.fetch(:delay))
end

#buffer_overflow(event) ⇒ Object



206
207
208
209
210
211
# File 'lib/kafka/statsd.rb', line 206

def buffer_overflow(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  increment("producer.#{client}.#{topic}.produce.errors")
end

#deliver_messages(event) ⇒ Object



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/kafka/statsd.rb', line 213

def deliver_messages(event)
  client = event.payload.fetch(:client_id)
  message_count = event.payload.fetch(:delivered_message_count)
  attempts = event.payload.fetch(:attempts)

  if event.payload.key?(:exception)
    increment("producer.#{client}.deliver.errors")
  end

  timing("producer.#{client}.deliver.latency", event.duration)

  # Messages delivered to Kafka:
  count("producer.#{client}.deliver.messages", message_count)

  # Number of attempts to deliver messages:
  timing("producer.#{client}.deliver.attempts", attempts)
end

#produce_message(event) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/kafka/statsd.rb', line 184

def produce_message(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)
  message_size = event.payload.fetch(:message_size)
  buffer_size = event.payload.fetch(:buffer_size)
  max_buffer_size = event.payload.fetch(:max_buffer_size)
  buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f
  buffer_fill_percentage = buffer_fill_ratio * 100.0

  # This gets us the write rate.
  increment("producer.#{client}.#{topic}.produce.messages")

  timing("producer.#{client}.#{topic}.produce.message_size", message_size)

  # This gets us the avg/max buffer size per producer.
  timing("producer.#{client}.buffer.size", buffer_size)

  # This gets us the avg/max buffer fill ratio per producer.
  timing("producer.#{client}.buffer.fill_ratio", buffer_fill_ratio)
  timing("producer.#{client}.buffer.fill_percentage", buffer_fill_percentage)
end

#topic_error(event) ⇒ Object



242
243
244
245
246
247
# File 'lib/kafka/statsd.rb', line 242

def topic_error(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  increment("producer.#{client}.#{topic}.ack.errors")
end