Class: Karafka::Connection::Client
- Inherits:
-
Object
- Object
- Karafka::Connection::Client
- Extended by:
- Forwardable
- Defined in:
- lib/karafka/connection/client.rb
Overview
Class used as a wrapper around Ruby-Kafka client to simplify additional features that we provide/might provide in future and to hide the internal implementation
Instance Method Summary collapse
-
#fetch_loop {|kafka| ... } ⇒ Object
Opens connection, gets messages and calls a block for each of the incoming messages.
-
#initialize(consumer_group) ⇒ Karafka::Connection::Client
constructor
Creates a queue consumer client that will pull the data from Kafka.
-
#mark_as_consumed(params) ⇒ Object
Marks a given message as consumed and commit the offsets.
-
#pause(topic, partition) ⇒ Object
Pauses fetching and consumption of a given topic partition.
-
#stop ⇒ Object
Gracefuly stops topic consumption.
-
#trigger_heartbeat ⇒ Object
Triggers a non-optional blocking heartbeat that notifies Kafka about the fact, that this consumer / client is still up and running.
Constructor Details
#initialize(consumer_group) ⇒ Karafka::Connection::Client
Creates a queue consumer client that will pull the data from Kafka
17 18 19 20 |
# File 'lib/karafka/connection/client.rb', line 17 def initialize(consumer_group) @consumer_group = consumer_group Persistence::Client.write(self) end |
Instance Method Details
#fetch_loop {|kafka| ... } ⇒ Object
This will yield with raw messages - no preprocessing or reformatting.
Opens connection, gets messages and calls a block for each of the incoming messages
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/karafka/connection/client.rb', line 25 def fetch_loop settings = ApiAdapter.consumption(consumer_group) if consumer_group.batch_fetching kafka_consumer.each_batch(*settings) { |batch| yield(batch.) } else # always yield an array of messages, so we have consistent API (always a batch) kafka_consumer.(*settings) { || yield([]) } end rescue Kafka::ProcessingError => error # If there was an error during consumption, we have to log it, pause current partition # and process other things Karafka.monitor.instrument( 'connection.client.fetch_loop.error', caller: self, error: error.cause ) pause(error.topic, error.partition) retry end |
#mark_as_consumed(params) ⇒ Object
In opposite to ruby-kafka, we commit the offset for each manual marking to be sure that offset commit happen asap in case of a crash
Marks a given message as consumed and commit the offsets
66 67 68 69 70 71 72 73 |
# File 'lib/karafka/connection/client.rb', line 66 def mark_as_consumed(params) kafka_consumer.( *ApiAdapter.(params) ) # Trigger an immediate, blocking offset commit in order to minimize the risk of crashing # before the automatic triggers have kicked in. kafka_consumer.commit_offsets end |
#pause(topic, partition) ⇒ Object
Pauses fetching and consumption of a given topic partition
58 59 60 |
# File 'lib/karafka/connection/client.rb', line 58 def pause(topic, partition) kafka_consumer.pause(*ApiAdapter.pause(topic, partition, consumer_group)) end |
#stop ⇒ Object
Stopping running consumers without a really important reason is not recommended as until all the consumers are stopped, the server will keep running serving only part of the messages
Gracefuly stops topic consumption
50 51 52 53 |
# File 'lib/karafka/connection/client.rb', line 50 def stop @kafka_consumer&.stop @kafka_consumer = nil end |
#trigger_heartbeat ⇒ Object
Triggers a non-optional blocking heartbeat that notifies Kafka about the fact, that this consumer / client is still up and running
77 78 79 |
# File 'lib/karafka/connection/client.rb', line 77 def trigger_heartbeat kafka_consumer.trigger_heartbeat! end |