Module: Deimos::KafkaListener

Defined in:
lib/deimos/instrumentation.rb

Overview

This module listens to events published by RubyKafka.

Class Method Summary collapse

Class Method Details

.send_produce_error(event) ⇒ Object

Listens for any exceptions that happen during publishing and re-publishes as a Deimos event.

Parameters:

  • event (ActiveSupport::Notification)


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/deimos/instrumentation.rb', line 43

def self.send_produce_error(event)
  exception = event.payload[:exception_object]
  return if !exception || !exception.respond_to?(:failed_messages)

  messages = exception.failed_messages
  messages.group_by(&:topic).each do |topic, batch|
    next if batch.empty?

    producer = batch.first.[:producer_name]
    payloads = batch.map { |m| m.[:decoded_payload] }

    Deimos.config.metrics&.count('publish_error', payloads.size,
                                 tags: %W(topic:#{topic}))
    Deimos.instrument(
      'produce_error',
      producer: producer,
      topic: topic,
      exception_object: exception,
      payloads: payloads
    )
  end
end