Class: Karafka::Connection::Client

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/karafka/connection/client.rb

Overview

Class used as a wrapper around Ruby-Kafka client to simplify additional features that we provide/might provide in future and to hide the internal implementation

Instance Method Summary collapse

Constructor Details

#initialize(consumer_group) ⇒ Karafka::Connection::Client

Creates a queue consumer client that will pull the data from Kafka

Parameters:



17
18
19
20
# File 'lib/karafka/connection/client.rb', line 17

def initialize(consumer_group)
  @consumer_group = consumer_group
  Persistence::Client.write(self)
end

Instance Method Details

#fetch_loop {|kafka| ... } ⇒ Object

Note:

This will yield with raw messages - no preprocessing or reformatting.

Opens connection, gets messages and calls a block for each of the incoming messages

Yield Parameters:

  • kafka (Array<Kafka::FetchedMessage>)

    fetched messages



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/karafka/connection/client.rb', line 25

def fetch_loop
  settings = ApiAdapter.consumption(consumer_group)

  if consumer_group.batch_fetching
    kafka_consumer.each_batch(*settings) { |batch| yield(batch.messages) }
  else
    # always yield an array of messages, so we have consistent API (always a batch)
    kafka_consumer.each_message(*settings) { |message| yield([message]) }
  end
rescue Kafka::ProcessingError => error
  # If there was an error during consumption, we have to log it, pause current partition
  # and process other things
  Karafka.monitor.instrument(
    'connection.client.fetch_loop.error',
    caller: self,
    error: error.cause
  )
  pause(error.topic, error.partition)
  retry
end

#mark_as_consumed(params) ⇒ Object

Note:

In opposite to ruby-kafka, we commit the offset for each manual marking to be sure that offset commit happen asap in case of a crash

Marks a given message as consumed and commit the offsets

Parameters:

  • params (Karafka::Params::Params)

    message that we want to mark as processed



66
67
68
69
70
71
72
73
# File 'lib/karafka/connection/client.rb', line 66

def mark_as_consumed(params)
  kafka_consumer.mark_message_as_processed(
    *ApiAdapter.mark_message_as_processed(params)
  )
  # Trigger an immediate, blocking offset commit in order to minimize the risk of crashing
  # before the automatic triggers have kicked in.
  kafka_consumer.commit_offsets
end

#pause(topic, partition) ⇒ Object

Pauses fetching and consumption of a given topic partition

Parameters:

  • topic (String)

    topic that we want to pause

  • partition (Integer)

    number partition that we want to pause



58
59
60
# File 'lib/karafka/connection/client.rb', line 58

def pause(topic, partition)
  kafka_consumer.pause(*ApiAdapter.pause(topic, partition, consumer_group))
end

#stopObject

Note:

Stopping running consumers without a really important reason is not recommended as until all the consumers are stopped, the server will keep running serving only part of the messages

Gracefuly stops topic consumption



50
51
52
53
# File 'lib/karafka/connection/client.rb', line 50

def stop
  @kafka_consumer&.stop
  @kafka_consumer = nil
end

#trigger_heartbeatObject

Triggers a non-optional blocking heartbeat that notifies Kafka about the fact, that this consumer / client is still up and running



77
78
79
# File 'lib/karafka/connection/client.rb', line 77

def trigger_heartbeat
  kafka_consumer.trigger_heartbeat!
end