Class: Karafka::Connection::Consumer
- Inherits:
-
Object
- Object
- Karafka::Connection::Consumer
- Defined in:
- lib/karafka/connection/consumer.rb
Overview
Class used as a wrapper around Ruby-Kafka to simplify additional features that we provide/might provide in future and to hide the internal implementation
Instance Method Summary collapse
-
#fetch_loop {|kafka| ... } ⇒ Object
Opens connection, gets messages and calls a block for each of the incoming messages.
-
#initialize(consumer_group) ⇒ Karafka::Connection::Consumer
constructor
Creates a queue consumer that will pull the data from Kafka.
-
#mark_as_consumed(params) ⇒ Object
Marks a given message as consumed and commit the offsets.
-
#pause(topic, partition) ⇒ Object
Pauses fetching and consumption of a given topic partition.
-
#stop ⇒ Object
Gracefuly stops topic consumption.
Constructor Details
#initialize(consumer_group) ⇒ Karafka::Connection::Consumer
Creates a queue consumer that will pull the data from Kafka
13 14 15 16 |
# File 'lib/karafka/connection/consumer.rb', line 13 def initialize(consumer_group) @consumer_group = consumer_group Persistence::Consumer.write(self) end |
Instance Method Details
#fetch_loop {|kafka| ... } ⇒ Object
This will yield with raw messages - no preprocessing or reformatting.
Opens connection, gets messages and calls a block for each of the incoming messages
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/karafka/connection/consumer.rb', line 21 def fetch_loop send( consumer_group.batch_fetching ? :consume_each_batch : :consume_each_message ) { || yield() } rescue Kafka::ProcessingError => e # If there was an error during consumption, we have to log it, pause current partition # and process other things Karafka.monitor.notice_error(self.class, e.cause) pause(e.topic, e.partition) retry # This is on purpose - see the notes for this method # rubocop:disable RescueException rescue Exception => e # rubocop:enable RescueException Karafka.monitor.notice_error(self.class, e) retry end |
#mark_as_consumed(params) ⇒ Object
In opposite to ruby-kafka, we commit the offset for each manual marking to be sure that offset commit happen asap in case of a crash
Marks a given message as consumed and commit the offsets
62 63 64 65 66 67 |
# File 'lib/karafka/connection/consumer.rb', line 62 def mark_as_consumed(params) kafka_consumer.(params) # Trigger an immediate, blocking offset commit in order to minimize the risk of crashing # before the automatic triggers have kicked in. kafka_consumer.commit_offsets end |
#pause(topic, partition) ⇒ Object
Pauses fetching and consumption of a given topic partition
51 52 53 54 55 56 |
# File 'lib/karafka/connection/consumer.rb', line 51 def pause(topic, partition) settings = ConfigAdapter.pausing(consumer_group) timeout = settings[:timeout] raise(Errors::InvalidPauseTimeout, timeout) unless timeout.positive? kafka_consumer.pause(topic, partition, settings) end |
#stop ⇒ Object
Stopping running consumers without a really important reason is not recommended as until all the consumers are stopped, the server will keep running serving only part of the messages
Gracefuly stops topic consumption
43 44 45 46 |
# File 'lib/karafka/connection/consumer.rb', line 43 def stop @kafka_consumer&.stop @kafka_consumer = nil end |