Module: Deimos::KafkaListener

Defined in:
lib/deimos/instrumentation.rb

Overview

This module listens to events published by RubyKafka.

Class Method Summary collapse

Class Method Details

.handle_exception_with_messages(exception) ⇒ Object

Parameters:

  • exception (Exception)


47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/deimos/instrumentation.rb', line 47

def self.handle_exception_with_messages(exception)
  messages = exception.failed_messages
  messages.group_by(&:topic).each do |topic, batch|
    producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
    next if batch.empty? || !producer

    decoder = Deimos.schema_backend(schema: producer.config[:schema],
                                    namespace: producer.config[:namespace])
    payloads = batch.map { |m| decoder.decode(m.value) }

    Deimos.config.metrics&.increment(
      'publish_error',
      tags: %W(topic:#{topic}),
      by: payloads.size
    )
    Deimos.instrument(
      'produce_error',
      producer: producer,
      topic: topic,
      exception_object: exception,
      payloads: payloads
    )
  end
end

.send_produce_error(event) ⇒ void

This method returns an undefined value.

Listens for any exceptions that happen during publishing and re-publishes as a Deimos event.

Parameters:

  • event (ActiveSupport::Notifications::Event)


76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/deimos/instrumentation.rb', line 76

def self.send_produce_error(event)
  exception = event.payload[:exception_object]
  return unless exception

  if exception.respond_to?(:failed_messages)
    handle_exception_with_messages(exception)
  else
    Deimos.config.metrics&.increment(
      'publish_error',
      by: event.payload[:message_count] || 1
    )
  end
end