Class: Karafka::Connection::Listener

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/listener.rb

Overview

Note:

It does not loop on itself - it needs to be executed in a loop

Note:

Listener itself does nothing with the message - it will return to the block a raw Kafka::FetchedMessage

A single listener that listens to incoming messages from a single route

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(consumer_group) ⇒ Karafka::Connection::Listener

Returns listener instance.

Parameters:



15
16
17
# File 'lib/karafka/connection/listener.rb', line 15

def initialize(consumer_group)
  @consumer_group = consumer_group
end

Instance Attribute Details

#consumer_groupObject (readonly)

Returns the value of attribute consumer_group.



10
11
12
# File 'lib/karafka/connection/listener.rb', line 10

def consumer_group
  @consumer_group
end

Instance Method Details

#fetch_loop(block) {|consumer, kafka| ... } ⇒ Object

Note:

This will yield with a raw message - no preprocessing or reformatting

Note:

We catch all the errors here, so they don’t affect other listeners (or this one) so we will be able to listen and consume other incoming messages. Since it is run inside Karafka::Connection::ActorCluster - catching all the exceptions won’t crash the whole cluster. Here we mostly focus on catchin the exceptions related to Kafka connections / Internet connection issues / Etc. Business logic problems should not propagate this far

Opens connection, gets messages and calls a block for each of the incoming messages

Yield Parameters:

  • consumer (String)

    group id

  • kafka (Array<Kafka::FetchedMessage>)

    fetched messages



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/karafka/connection/listener.rb', line 29

def fetch_loop(block)
  consumer.fetch_loop do |raw_messages|
    block.call(consumer_group.id, raw_messages)
  end
  # This is on purpose - see the notes for this method
  # rubocop:disable RescueException
rescue Exception => e
  # rubocop:enable RescueException
  Karafka.monitor.notice_error(self.class, e)
  @consumer&.stop
  retry if @consumer
end