Class: Karafka::Routing::ConsumerGroup

Inherits:
Object
  • Object
show all
Extended by:
Helpers::ConfigRetriever
Defined in:
lib/karafka/routing/consumer_group.rb

Overview

Object used to describe a single consumer group that is going to subscribe to given topics It is a part of Karafka’s DSL

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::ConfigRetriever

config_retriever_for

Constructor Details

#initialize(name) ⇒ ConsumerGroup

Returns a new instance of ConsumerGroup.

Parameters:

  • name (String, Symbol)

    raw name of this consumer group. Raw means, that it does not yet have an application client_id namespace, this will be added here by default. We add it to make a multi-system development easier for people that don’t use kafka and don’t understand the concept of consumer groups.



21
22
23
24
25
# File 'lib/karafka/routing/consumer_group.rb', line 21

def initialize(name)
  @name = name
  @id = Karafka::App.config.consumer_mapper.call(name)
  @topics = []
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



11
12
13
# File 'lib/karafka/routing/consumer_group.rb', line 11

def id
  @id
end

#nameObject (readonly)

Returns the value of attribute name.



11
12
13
# File 'lib/karafka/routing/consumer_group.rb', line 11

def name
  @name
end

#topicsObject (readonly)

Returns the value of attribute topics.



11
12
13
# File 'lib/karafka/routing/consumer_group.rb', line 11

def topics
  @topics
end

Instance Method Details

#active?Boolean

Returns true if this consumer group should be active in our current process.

Returns:

  • (Boolean)

    true if this consumer group should be active in our current process



28
29
30
# File 'lib/karafka/routing/consumer_group.rb', line 28

def active?
  Karafka::Server.consumer_groups.include?(name)
end

#to_hHash

Hashed version of consumer group that can be used for validation purposes topics inside of it.

Returns:

  • (Hash)

    hash with consumer group attributes including serialized to hash



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/karafka/routing/consumer_group.rb', line 49

def to_h
  result = {
    topics: topics.map(&:to_h),
    id: id
  }

  Karafka::AttributesMap.consumer_group.each do |attribute|
    result[attribute] = public_send(attribute)
  end

  result
end

#topic=(name, &block) ⇒ Karafka::Routing::Topic

Builds a topic representation inside of a current consumer group route

Parameters:

  • name (String, Symbol)

    name of topic to which we want to subscribe

  • block (Proc)

    block that we want to evaluate in the topic context

Returns:



36
37
38
39
40
# File 'lib/karafka/routing/consumer_group.rb', line 36

def topic=(name, &block)
  topic = Topic.new(name, self)
  @topics << Proxy.new(topic, &block).target.tap(&:build)
  @topics.last
end