Class: Kafka::Interceptors

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/interceptors.rb

Overview

Holds a list of interceptors that implement call and wraps calls to a chain of custom interceptors.

Instance Method Summary collapse

Constructor Details

#initialize(interceptors:, logger:) ⇒ Interceptors

Returns a new instance of Interceptors.



7
8
9
10
# File 'lib/kafka/interceptors.rb', line 7

def initialize(interceptors:, logger:)
  @interceptors = interceptors || []
  @logger = TaggedLogger.new(logger)
end

Instance Method Details

#call(intercepted) ⇒ Kafka::PendingMessage || Kafka::FetchedBatch

This method is called when the client produces a message or once the batches are fetched. The message returned from the first call is passed to the second interceptor call, and so on in an interceptor chain. This method does not throw exceptions.

Parameters:

Returns:



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/kafka/interceptors.rb', line 21

def call(intercepted)
  @interceptors.each do |interceptor|
    begin
      intercepted = interceptor.call(intercepted)
    rescue Exception => e
      @logger.warn "Error executing interceptor for topic: #{intercepted.topic} partition: #{intercepted.partition}: #{e.message}\n#{e.backtrace.join("\n")}"
    end
  end

  intercepted
end