Class: Karafka::Routing::Topic

Inherits:
Object
  • Object
show all
Extended by:
Helpers::ConfigRetriever
Defined in:
lib/karafka/routing/topic.rb

Overview

Topic stores all the details on how we should interact with Kafka given topic It belongs to a consumer group as from 0.6 all the topics can work in the same consumer group It is a part of Karafka’s DSL

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::ConfigRetriever

config_retriever_for

Constructor Details

#initialize(name, consumer_group) ⇒ Topic



16
17
18
19
20
21
22
23
24
# File 'lib/karafka/routing/topic.rb', line 16

def initialize(name, consumer_group)
  @name = name.to_s
  @consumer_group = consumer_group
  @attributes = {}
  # @note We use identifier related to the consumer group that owns a topic, because from
  #   Karafka 0.6 we can handle multiple Kafka instances with the same process and we can
  #   have same topic name across mutliple Kafkas
  @id = "#{consumer_group.id}_#{@name}"
end

Instance Attribute Details

#consumerObject

Returns the value of attribute consumer.



12
13
14
# File 'lib/karafka/routing/topic.rb', line 12

def consumer
  @consumer
end

#consumer_groupObject (readonly)

Returns the value of attribute consumer_group.



11
12
13
# File 'lib/karafka/routing/topic.rb', line 11

def consumer_group
  @consumer_group
end

#idObject (readonly)

Returns the value of attribute id.



11
12
13
# File 'lib/karafka/routing/topic.rb', line 11

def id
  @id
end

Instance Method Details

#buildObject

Initializes default values for all the options that support defaults if their values are not yet specified. This is need to be done (cannot be lazy loaded on first use) because everywhere except Karafka server command, those would not be initialized on time - for example for Sidekiq



30
31
32
33
34
# File 'lib/karafka/routing/topic.rb', line 30

def build
  Karafka::AttributesMap.topic.each { |attr| send(attr) }
  consumer&.topic = self
  self
end

#responderClass?



38
39
40
# File 'lib/karafka/routing/topic.rb', line 38

def responder
  @responder ||= Karafka::Responders::Builder.new(consumer).build
end

#to_hHash

Note:

This is being used when we validate the consumer_group and its topics

Returns hash with all the topic attributes.



48
49
50
51
52
53
54
55
56
57
# File 'lib/karafka/routing/topic.rb', line 48

def to_h
  map = Karafka::AttributesMap.topic.map do |attribute|
    [attribute, public_send(attribute)]
  end

  Hash[map].merge!(
    id: id,
    consumer: consumer
  )
end