Class: Kafka::Statsd::ProducerSubscriber

Inherits:
StatsdSubscriber
  • Object
show all
Defined in:
lib/kafka/statsd.rb

Instance Method Summary collapse

Instance Method Details

#ack_message(event) ⇒ Object



209
210
211
212
213
214
215
216
217
218
# File 'lib/kafka/statsd.rb', line 209

def ack_message(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  # Number of messages ACK'd for the topic.
  increment("producer.#{client}.#{topic}.ack.messages")

  # Histogram of delay between a message being produced and it being ACK'd.
  timing("producer.#{client}.#{topic}.ack.delay", event.payload.fetch(:delay))
end

#buffer_overflow(event) ⇒ Object



184
185
186
187
188
189
# File 'lib/kafka/statsd.rb', line 184

def buffer_overflow(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  increment("producer.#{client}.#{topic}.produce.errors")
end

#deliver_messages(event) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/kafka/statsd.rb', line 191

def deliver_messages(event)
  client = event.payload.fetch(:client_id)
  message_count = event.payload.fetch(:delivered_message_count)
  attempts = event.payload.fetch(:attempts)

  if event.payload.key?(:exception)
    increment("producer.#{client}.deliver.errors")
  end

  timing("producer.#{client}.deliver.latency", event.duration)

  # Messages delivered to Kafka:
  count("producer.#{client}.deliver.messages", message_count)

  # Number of attempts to deliver messages:
  timing("producer.#{client}.deliver.attempts", attempts)
end

#produce_message(event) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/kafka/statsd.rb', line 162

def produce_message(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)
  message_size = event.payload.fetch(:message_size)
  buffer_size = event.payload.fetch(:buffer_size)
  max_buffer_size = event.payload.fetch(:max_buffer_size)
  buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f
  buffer_fill_percentage = buffer_fill_ratio * 100.0

  # This gets us the write rate.
  increment("producer.#{client}.#{topic}.produce.messages")

  timing("producer.#{client}.#{topic}.produce.message_size", message_size)

  # This gets us the avg/max buffer size per producer.
  timing("producer.#{client}.buffer.size", buffer_size)

  # This gets us the avg/max buffer fill ratio per producer.
  timing("producer.#{client}.buffer.fill_ratio", buffer_fill_ratio)
  timing("producer.#{client}.buffer.fill_percentage", buffer_fill_percentage)
end

#topic_error(event) ⇒ Object



220
221
222
223
224
225
# File 'lib/kafka/statsd.rb', line 220

def topic_error(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  increment("producer.#{client}.#{topic}.ack.errors")
end