Class: Karafka::Connection::Listener
- Inherits:
-
Object
- Object
- Karafka::Connection::Listener
- Defined in:
- lib/karafka/connection/listener.rb
Overview
It does not loop on itself - it needs to be executed in a loop
Listener itself does nothing with the message - it will return to the block a raw Kafka::FetchedMessage
A single listener that listens to incoming messages from a single route
Instance Attribute Summary collapse
-
#consumer_group ⇒ Object
readonly
Returns the value of attribute consumer_group.
Instance Method Summary collapse
-
#fetch_loop(block) {|consumer, kafka| ... } ⇒ Object
Opens connection, gets messages and calls a block for each of the incoming messages.
-
#initialize(consumer_group) ⇒ Karafka::Connection::Listener
constructor
Listener instance.
Constructor Details
#initialize(consumer_group) ⇒ Karafka::Connection::Listener
Returns listener instance.
15 16 17 |
# File 'lib/karafka/connection/listener.rb', line 15 def initialize(consumer_group) @consumer_group = consumer_group end |
Instance Attribute Details
#consumer_group ⇒ Object (readonly)
Returns the value of attribute consumer_group.
10 11 12 |
# File 'lib/karafka/connection/listener.rb', line 10 def consumer_group @consumer_group end |
Instance Method Details
#fetch_loop(block) {|consumer, kafka| ... } ⇒ Object
This will yield with a raw message - no preprocessing or reformatting
We catch all the errors here, so they don’t affect other listeners (or this one) so we will be able to listen and consume other incoming messages. Since it is run inside Karafka::Connection::ActorCluster - catching all the exceptions won’t crash the whole cluster. Here we mostly focus on catchin the exceptions related to Kafka connections / Internet connection issues / Etc. Business logic problems should not propagate this far
Opens connection, gets messages and calls a block for each of the incoming messages
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/karafka/connection/listener.rb', line 29 def fetch_loop(block) consumer.fetch_loop do || block.call(consumer_group.id, ) end # This is on purpose - see the notes for this method # rubocop:disable RescueException rescue Exception => e # rubocop:enable RescueException Karafka.monitor.notice_error(self.class, e) @consumer&.stop retry if @consumer end |