Class: Karafka::Routing::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/routing/topic.rb

Overview

Topic stores all the details on how we should interact with Kafka given topic. It belongs to a consumer group as from 0.6 all the topics can work in the same consumer group It is a part of Karafka’s DSL.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, consumer_group) ⇒ Topic

Returns a new instance of Topic.

Parameters:



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/karafka/routing/topic.rb', line 34

def initialize(name, consumer_group)
  @name = name.to_s
  @consumer_group = consumer_group
  @attributes = {}
  @active = true
  # @note We use identifier related to the consumer group that owns a topic, because from
  #   Karafka 0.6 we can handle multiple Kafka instances with the same process and we can
  #   have same topic name across multiple consumer groups
  @id = "#{consumer_group.id}_#{@name}"
  @consumer = nil
  @active_assigned = false
  @subscription_group_details = nil

  INHERITABLE_ATTRIBUTES.each do |attribute|
    instance_variable_set("@#{attribute}", nil)
  end
end

Instance Attribute Details

#consumerClass

Returns consumer class that we should use.

Returns:

  • (Class)

    consumer class that we should use



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/karafka/routing/topic.rb', line 85

def consumer
  if consumer_persistence
    # When persistence of consumers is on, no need to reload them
    @consumer
  else
    # In order to support code reload without having to change the topic api, we re-fetch the
    # class of a consumer based on its class name. This will support all the cases where the
    # consumer class is defined with a name. It won't support code reload for anonymous
    # consumer classes, but this is an edge case
    begin
      ::Object.const_get(@consumer.to_s)
    rescue NameError
      # It will only fail if the in case of anonymous classes
      @consumer
    end
  end
end

#consumer_groupObject (readonly)

Returns the value of attribute consumer_group.



9
10
11
# File 'lib/karafka/routing/topic.rb', line 9

def consumer_group
  @consumer_group
end

#idObject (readonly)

Returns the value of attribute id.



9
10
11
# File 'lib/karafka/routing/topic.rb', line 9

def id
  @id
end

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/karafka/routing/topic.rb', line 9

def name
  @name
end

#subscription_groupObject

Full subscription group reference can be built only when we have knowledge about the whole routing tree, this is why it is going to be set later on



16
17
18
# File 'lib/karafka/routing/topic.rb', line 16

def subscription_group
  @subscription_group
end

#subscription_group_detailsObject

Returns the value of attribute subscription_group_details.



12
13
14
# File 'lib/karafka/routing/topic.rb', line 12

def subscription_group_details
  @subscription_group_details
end

Instance Method Details

#active(active) ⇒ Object

Allows to disable topic by invoking this method and setting it to ‘false`.

Parameters:

  • active (Boolean)

    should this topic be consumed or not



105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/karafka/routing/topic.rb', line 105

def active(active)
  # Do not allow for active overrides. Basically if this is set on the topic level, defaults
  # will not overwrite it and this is desired. Otherwise because of the fact that this is
  # not a full feature config but just a flag, default value would always overwrite the
  # per-topic config since defaults application happens after the topic config block
  unless @active_assigned
    @active = active
    @active_assigned = true
  end

  @active
end

#active?Boolean

Returns should this topic be in use.

Returns:

  • (Boolean)

    should this topic be in use



128
129
130
131
132
133
# File 'lib/karafka/routing/topic.rb', line 128

def active?
  # Never active if disabled via routing
  return false unless @active

  Karafka::App.config.internal.routing.activity_manager.active?(:topics, name)
end

#consumer_classClass

Note:

This is just an alias to the ‘#consumer` method. We however want to use it internally instead of referencing the `#consumer`. We use this to indicate that this method returns class and not an instance. In the routing we want to keep the `#consumer Consumer` routing syntax, but for references outside, we should use this one.

Returns consumer class that we should use.

Returns:

  • (Class)

    consumer class that we should use



123
124
125
# File 'lib/karafka/routing/topic.rb', line 123

def consumer_class
  consumer
end

#kafka=(settings = {}) ⇒ Object

Note:

It is set to ‘false` by default to preserve backwards compatibility

Often users want to have the same basic cluster setup with small setting alterations This method allows us to do so by setting ‘inherit` to `true`. Whe inherit is enabled, settings will be merged with defaults.

Parameters:

  • settings (Hash) (defaults to: {})

    kafka scope settings. If ‘:inherit` key is provided, it will instruct the assignment to merge with root level defaults



73
74
75
76
77
# File 'lib/karafka/routing/topic.rb', line 73

def kafka=(settings = {})
  inherit = settings.delete(:inherit)

  @kafka = inherit ? Karafka::App.config.kafka.merge(settings) : settings
end

#subscription_nameString

Returns name of subscription that will go to librdkafka.

Returns:

  • (String)

    name of subscription that will go to librdkafka



80
81
82
# File 'lib/karafka/routing/topic.rb', line 80

def subscription_name
  name
end

#to_hHash

Note:

This is being used when we validate the consumer_group and its topics

Returns hash with all the topic attributes.

Returns:

  • (Hash)

    hash with all the topic attributes



137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/karafka/routing/topic.rb', line 137

def to_h
  map = INHERITABLE_ATTRIBUTES.map do |attribute|
    [attribute, public_send(attribute)]
  end

  Hash[map].merge!(
    id: id,
    name: name,
    active: active?,
    consumer: consumer,
    consumer_group_id: consumer_group.id,
    subscription_group_details: subscription_group_details
  ).freeze
end