Class: Kafka::Statsd::ProducerSubscriber

Inherits:
StatsdSubscriber
  • Object
show all
Defined in:
lib/kafka/statsd.rb

Instance Method Summary collapse

Instance Method Details

#ack_message(event) ⇒ Object



201
202
203
204
205
206
207
208
209
210
# File 'lib/kafka/statsd.rb', line 201

def ack_message(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  # Number of messages ACK'd for the topic.
  increment("producer.#{client}.#{topic}.ack.messages")

  # Histogram of delay between a message being produced and it being ACK'd.
  timing("producer.#{client}.#{topic}.ack.delay", event.payload.fetch(:delay))
end

#buffer_overflow(event) ⇒ Object



176
177
178
179
180
181
# File 'lib/kafka/statsd.rb', line 176

def buffer_overflow(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  increment("producer.#{client}.#{topic}.produce.errors")
end

#deliver_messages(event) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/kafka/statsd.rb', line 183

def deliver_messages(event)
  client = event.payload.fetch(:client_id)
  message_count = event.payload.fetch(:delivered_message_count)
  attempts = event.payload.fetch(:attempts)

  if event.payload.key?(:exception)
    increment("producer.#{client}.deliver.errors")
  end

  timing("producer.#{client}.deliver.latency", event.duration)

  # Messages delivered to Kafka:
  count("producer.#{client}.deliver.messages", message_count)

  # Number of attempts to deliver messages:
  timing("producer.#{client}.deliver.attempts", attempts)
end

#produce_message(event) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/kafka/statsd.rb', line 154

def produce_message(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)
  message_size = event.payload.fetch(:message_size)
  buffer_size = event.payload.fetch(:buffer_size)
  max_buffer_size = event.payload.fetch(:max_buffer_size)
  buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f
  buffer_fill_percentage = buffer_fill_ratio * 100.0

  # This gets us the write rate.
  increment("producer.#{client}.#{topic}.produce.messages")

  timing("producer.#{client}.#{topic}.produce.message_size", message_size)

  # This gets us the avg/max buffer size per producer.
  timing("producer.#{client}.buffer.size", buffer_size)

  # This gets us the avg/max buffer fill ratio per producer.
  timing("producer.#{client}.buffer.fill_ratio", buffer_fill_ratio)
  timing("producer.#{client}.buffer.fill_percentage", buffer_fill_percentage)
end

#topic_error(event) ⇒ Object



212
213
214
215
216
217
# File 'lib/kafka/statsd.rb', line 212

def topic_error(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  increment("producer.#{client}.#{topic}.ack.errors")
end