Class: Karafka::Connection::Client

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/karafka/connection/client.rb

Overview

Class used as a wrapper around Ruby-Kafka client to simplify additional features that we provide/might provide in future and to hide the internal implementation

Instance Method Summary collapse

Constructor Details

#initialize(consumer_group) ⇒ Karafka::Connection::Client

Creates a queue consumer client that will pull the data from Kafka

Parameters:



23
24
25
26
# File 'lib/karafka/connection/client.rb', line 23

def initialize(consumer_group)
  @consumer_group = consumer_group
  Persistence::Client.write(self)
end

Instance Method Details

#fetch_loop {|kafka| ... } ⇒ Object

Note:

This will yield with raw messages - no preprocessing or reformatting.

Opens connection, gets messages and calls a block for each of the incoming messages

Yield Parameters:

  • kafka (Array<Kafka::FetchedMessage>, Symbol)

    response with an info about the type of the fetcher that is being used



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/karafka/connection/client.rb', line 32

def fetch_loop
  settings = ApiAdapter.consumption(consumer_group)

  if consumer_group.batch_fetching
    kafka_consumer.each_batch(**settings) { |batch| yield(batch, :batch) }
  else
    kafka_consumer.each_message(**settings) { |message| yield(message, :message) }
  end
# @note We catch only the processing errors as any other are considered critical (exceptions)
#   and should require a client restart with a backoff
rescue Kafka::ProcessingError => e
  # If there was an error during consumption, we have to log it, pause current partition
  # and process other things
  Karafka.monitor.instrument(
    'connection.client.fetch_loop.error',
    caller: self,
    error: e.cause
  )
  pause(e.topic, e.partition)
  retry
end

#mark_as_consumed(params) ⇒ Object

Note:

This method won’t trigger automatic offsets commits, rather relying on the ruby-kafka offsets time-interval based committing

Marks given message as consumed

Parameters:



74
75
76
77
78
# File 'lib/karafka/connection/client.rb', line 74

def mark_as_consumed(params)
  kafka_consumer.mark_message_as_processed(
    *ApiAdapter.mark_message_as_processed(params)
  )
end

#mark_as_consumed!(params) ⇒ Object

Note:

This method commits the offset for each manual marking to be sure that offset commit happen asap in case of a crash

Marks a given message as consumed and commit the offsets in a blocking way

Parameters:



84
85
86
87
88
89
# File 'lib/karafka/connection/client.rb', line 84

def mark_as_consumed!(params)
  mark_as_consumed(params)
  # Trigger an immediate, blocking offset commit in order to minimize the risk of crashing
  # before the automatic triggers have kicked in.
  kafka_consumer.commit_offsets
end

#pause(topic, partition) ⇒ Object

Pauses fetching and consumption of a given topic partition

Parameters:

  • topic (String)

    topic that we want to pause

  • partition (Integer)

    number partition that we want to pause



66
67
68
# File 'lib/karafka/connection/client.rb', line 66

def pause(topic, partition)
  kafka_consumer.pause(*ApiAdapter.pause(topic, partition, consumer_group))
end

#stopObject

Note:

Stopping running consumers without a really important reason is not recommended as until all the consumers are stopped, the server will keep running serving only part of the messages

Gracefully stops topic consumption



58
59
60
61
# File 'lib/karafka/connection/client.rb', line 58

def stop
  @kafka_consumer&.stop
  @kafka_consumer = nil
end