Class: Karafka::Routing::Topic

Inherits:
Object
  • Object
show all
Extended by:
Forwardable, Helpers::ConfigRetriever
Defined in:
lib/karafka/routing/topic.rb

Overview

Topic stores all the details on how we should interact with Kafka given topic It belongs to a consumer group as from 0.6 all the topics can work in the same consumer group It is a part of Karafka’s DSL

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::ConfigRetriever

config_retriever_for

Constructor Details

#initialize(name, consumer_group) ⇒ Topic

Returns a new instance of Topic.

Parameters:



19
20
21
22
23
24
25
26
27
# File 'lib/karafka/routing/topic.rb', line 19

def initialize(name, consumer_group)
  @name = name.to_s
  @consumer_group = consumer_group
  @attributes = {}
  # @note We use identifier related to the consumer group that owns a topic, because from
  #   Karafka 0.6 we can handle multiple Kafka instances with the same process and we can
  #   have same topic name across multiple Kafkas
  @id = "#{consumer_group.id}_#{@name}"
end

Instance Attribute Details

#consumerObject

Returns the value of attribute consumer.



13
14
15
# File 'lib/karafka/routing/topic.rb', line 13

def consumer
  @consumer
end

#consumer_groupObject (readonly)

Returns the value of attribute consumer_group.



12
13
14
# File 'lib/karafka/routing/topic.rb', line 12

def consumer_group
  @consumer_group
end

#idObject (readonly)

Returns the value of attribute id.



12
13
14
# File 'lib/karafka/routing/topic.rb', line 12

def id
  @id
end

Instance Method Details

#buildObject

Initializes default values for all the options that support defaults if their values are not yet specified. This is need to be done (cannot be lazy loaded on first use) because everywhere except Karafka server command, those would not be initialized on time - for example for Sidekiq



33
34
35
36
# File 'lib/karafka/routing/topic.rb', line 33

def build
  Karafka::AttributesMap.topic.each { |attr| send(attr) }
  self
end

#responderClass?

Returns Class (not an instance) of a responder that should respond from consumer back to Kafka (useful for piping data flows).

Returns:

  • (Class, nil)

    Class (not an instance) of a responder that should respond from consumer back to Kafka (useful for piping data flows)



40
41
42
# File 'lib/karafka/routing/topic.rb', line 40

def responder
  @responder ||= Karafka::Responders::Builder.new(consumer).build
end

#to_hHash

Note:

This is being used when we validate the consumer_group and its topics

Returns hash with all the topic attributes.

Returns:

  • (Hash)

    hash with all the topic attributes



50
51
52
53
54
55
56
57
58
59
# File 'lib/karafka/routing/topic.rb', line 50

def to_h
  map = Karafka::AttributesMap.topic.map do |attribute|
    [attribute, public_send(attribute)]
  end

  Hash[map].merge!(
    id: id,
    consumer: consumer
  )
end