Class: Kafka::Datadog::ProducerSubscriber

Inherits:
StatsdSubscriber
  • Object
show all
Defined in:
lib/kafka/datadog.rb

Instance Method Summary collapse

Instance Method Details

#ack_message(event) ⇒ Object



185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/kafka/datadog.rb', line 185

def ack_message(event)
  tags = {
    client: event.payload.fetch(:client_id),
    topic: event.payload.fetch(:topic),
  }

  # Number of messages ACK'd for the topic.
  increment("producer.ack.messages", tags: tags)

  # Histogram of delay between a message being produced and it being ACK'd.
  histogram("producer.ack.delay", event.payload.fetch(:delay), tags: tags)
end

#buffer_overflow(event) ⇒ Object



154
155
156
157
158
159
160
161
# File 'lib/kafka/datadog.rb', line 154

def buffer_overflow(event)
  tags = {
    client: event.payload.fetch(:client_id),
    topic: event.payload.fetch(:topic),
  }

  increment("producer.produce.errors", tags: tags)
end

#deliver_messages(event) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/kafka/datadog.rb', line 163

def deliver_messages(event)
  client = event.payload.fetch(:client_id)
  message_count = event.payload.fetch(:delivered_message_count)
  attempts = event.payload.fetch(:attempts)

  tags = {
    client: client,
  }

  if event.payload.key?(:exception)
    increment("producer.deliver.errors", tags: tags)
  end

  timing("producer.deliver.latency", event.duration, tags: tags)

  # Messages delivered to Kafka:
  count("producer.deliver.messages", message_count, tags: tags)

  # Number of attempts to deliver messages:
  histogram("producer.deliver.attempts", attempts, tags: tags)
end

#produce_message(event) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/kafka/datadog.rb', line 133

def produce_message(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)
  buffer_size = event.payload.fetch(:buffer_size)
  max_buffer_size = event.payload.fetch(:max_buffer_size)
  buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f

  tags = {
    client: client,
  }

  # This gets us the write rate.
  increment("producer.produce.messages", tags: tags.merge(topic: topic))

  # This gets us the avg/max buffer size per producer.
  histogram("producer.buffer.size", buffer_size, tags: tags)

  # This gets us the avg/max buffer fill ratio per producer.
  histogram("producer.buffer.fill_ratio", buffer_fill_ratio, tags: tags)
end

#topic_error(event) ⇒ Object



198
199
200
201
202
203
204
205
# File 'lib/kafka/datadog.rb', line 198

def topic_error(event)
  tags = {
    client: event.payload.fetch(:client_id),
    topic: event.payload.fetch(:topic)
  }

  increment("producer.ack.errors", tags: tags)
end