Class: Karafka::Routing::Topic

Inherits:
Object
  • Object
show all
Extended by:
Helpers::ConfigRetriever
Defined in:
lib/karafka/routing/topic.rb

Overview

Topic stores all the details on how we should interact with Kafka given topic It belongs to a consumer group as from 0.6 all the topics can work in the same consumer group It is a part of Karafka’s DSL

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::ConfigRetriever

config_retriever_for

Constructor Details

#initialize(name, consumer_group) ⇒ Topic

Returns a new instance of Topic.

Parameters:



16
17
18
19
20
21
22
23
24
# File 'lib/karafka/routing/topic.rb', line 16

def initialize(name, consumer_group)
  @name = name.to_s
  @consumer_group = consumer_group
  @attributes = {}
  # @note We use identifier related to the consumer group that owns a topic, because from
  #   Karafka 0.6 we can handle multiple Kafka instances with the same process and we can
  #   have same topic name across mutliple Kafkas
  @id = "#{consumer_group.id}_#{@name}"
end

Instance Attribute Details

#consumerObject

Returns the value of attribute consumer.



12
13
14
# File 'lib/karafka/routing/topic.rb', line 12

def consumer
  @consumer
end

#consumer_groupObject (readonly)

Returns the value of attribute consumer_group.



11
12
13
# File 'lib/karafka/routing/topic.rb', line 11

def consumer_group
  @consumer_group
end

#idObject (readonly)

Returns the value of attribute id.



11
12
13
# File 'lib/karafka/routing/topic.rb', line 11

def id
  @id
end

Instance Method Details

#buildObject

Initializes default values for all the options that support defaults if their values are not yet specified. This is need to be done (cannot be lazy loaded on first use) because everywhere except Karafka server command, those would not be initialized on time - for example for Sidekiq



30
31
32
33
34
# File 'lib/karafka/routing/topic.rb', line 30

def build
  Karafka::AttributesMap.topic.each { |attr| send(attr) }
  consumer&.topic = self
  self
end

#responderClass?

Returns Class (not an instance) of a responder that should respond from consumer back to Kafka (usefull for piping dataflows).

Returns:

  • (Class, nil)

    Class (not an instance) of a responder that should respond from consumer back to Kafka (usefull for piping dataflows)



38
39
40
# File 'lib/karafka/routing/topic.rb', line 38

def responder
  @responder ||= Karafka::Responders::Builder.new(consumer).build
end

#to_hHash

Note:

This is being used when we validate the consumer_group and its topics

Returns hash with all the topic attributes.

Returns:

  • (Hash)

    hash with all the topic attributes



48
49
50
51
52
53
54
55
56
57
# File 'lib/karafka/routing/topic.rb', line 48

def to_h
  map = Karafka::AttributesMap.topic.map do |attribute|
    [attribute, public_send(attribute)]
  end

  Hash[map].merge!(
    id: id,
    consumer: consumer
  )
end