Class: Karafka::Admin::Topics

Inherits:
Karafka::Admin show all
Defined in:
lib/karafka/admin/topics.rb

Overview

Topic administration operations Provides methods to manage Kafka topics including creation, deletion, reading, and introspection

Class Method Summary collapse

Methods inherited from Karafka::Admin

cluster_info, copy_consumer_group, create_topic, delete_consumer_group, delete_topic, read_lags_with_offsets, read_topic, rename_consumer_group, seek_consumer_group, topic_info, with_admin, with_consumer

Class Method Details

.create(name, partitions, replication_factor, topic_config = {}) ⇒ void

This method returns an undefined value.

Creates Kafka topic with given settings

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions we expect

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details as described here: kafka.apache.org/documentation/#topicconfigs



98
99
100
101
102
103
104
105
106
107
# File 'lib/karafka/admin/topics.rb', line 98

def create(name, partitions, replication_factor, topic_config = {})
  with_admin do |admin|
    handler = admin.create_topic(name, partitions, replication_factor, topic_config)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: max_wait_time_seconds) },
      -> { names.include?(name) }
    )
  end
end

.create_partitions(name, partitions) ⇒ void

This method returns an undefined value.

Creates more partitions for a given topic

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with



131
132
133
134
135
136
137
138
139
140
# File 'lib/karafka/admin/topics.rb', line 131

def create_partitions(name, partitions)
  with_admin do |admin|
    handler = admin.create_partitions(name, partitions)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: max_wait_time_seconds) },
      -> { info(name).fetch(:partition_count) >= partitions }
    )
  end
end

.delete(name) ⇒ void

This method returns an undefined value.

Deleted a given topic

Parameters:

  • name (String)

    topic name



114
115
116
117
118
119
120
121
122
123
# File 'lib/karafka/admin/topics.rb', line 114

def delete(name)
  with_admin do |admin|
    handler = admin.delete_topic(name)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: max_wait_time_seconds) },
      -> { !names.include?(name) }
    )
  end
end

.info(topic_name) ⇒ Hash

Note:

This query is much more efficient than doing a full ‘#cluster_info` + topic lookup because it does not have to query for all the topics data but just the topic we’re interested in

Returns basic topic metadata

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

Returns:

  • (Hash)

    topic metadata info hash

Raises:

  • (Rdkafka::RdkafkaError)

    ‘unknown_topic_or_part` if requested topic is not found



162
163
164
165
166
167
168
169
# File 'lib/karafka/admin/topics.rb', line 162

def info(topic_name)
  with_admin do |admin|
    admin
      .(topic_name)
      .topics
      .find { |topic| topic[:topic_name] == topic_name }
  end
end

.read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>

Allows us to read messages from the topic

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start. If -1 is provided (default) we will start from the latest offset. If time is provided, the appropriate offset will be resolved. If negative beyond -1 is provided, we move backwards more.

  • settings (Hash) (defaults to: {})

    kafka extra settings (optional)

Returns:



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/karafka/admin/topics.rb', line 21

def read(name, partition, count, start_offset = -1, settings = {})
  messages = []
  tpl = Rdkafka::Consumer::TopicPartitionList.new
  low_offset, high_offset = nil

  with_consumer(settings) do |consumer|
    # Convert the time offset (if needed)
    start_offset = resolve_offset(consumer, name.to_s, partition, start_offset)

    low_offset, high_offset = consumer.query_watermark_offsets(name, partition)

    # Select offset dynamically if -1 or less and move backwards with the negative
    # offset, allowing to start from N messages back from high-watermark
    start_offset = high_offset - count - start_offset.abs + 1 if start_offset.negative?
    start_offset = low_offset if start_offset.negative?

    # Build the requested range - since first element is on the start offset we need to
    # subtract one from requested count to end up with expected number of elements
    requested_range = (start_offset..start_offset + (count - 1))
    # Establish theoretical available range. Note, that this does not handle cases related
    # to log retention or compaction
    available_range = (low_offset..(high_offset - 1))
    # Select only offset that we can select. This will remove all the potential offsets
    # that are below the low watermark offset
    possible_range = requested_range.select { |offset| available_range.include?(offset) }

    start_offset = possible_range.first
    count = possible_range.size

    tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset)
    consumer.assign(tpl)

    # We should poll as long as we don't have all the messages that we need or as long as
    # we do not read all the messages from the topic
    loop do
      # If we've got as many messages as we've wanted stop
      break if messages.size >= count

      message = consumer.poll(200)

      next unless message

      # If the message we've got is beyond the requested range, stop
      break unless possible_range.include?(message.offset)

      messages << message
    rescue Rdkafka::RdkafkaError => e
      # End of partition
      break if e.code == :partition_eof

      raise e
    end
  end

  # Use topic from routes if we can match it or create a dummy one
  # Dummy one is used in case we cannot match the topic with routes. This can happen
  # when admin API is used to read topics that are not part of the routing
  topic = ::Karafka::Routing::Router.find_or_initialize_by_name(name)

  messages.map! do |message|
    Messages::Builders::Message.call(
      message,
      topic,
      Time.now
    )
  end
end

.read_watermark_offsets(name, partition) ⇒ Array<Integer, Integer>

Fetches the watermark offsets for a given topic partition

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

Returns:

  • (Array<Integer, Integer>)

    low watermark offset and high watermark offset



147
148
149
150
151
# File 'lib/karafka/admin/topics.rb', line 147

def read_watermark_offsets(name, partition)
  with_consumer do |consumer|
    consumer.query_watermark_offsets(name, partition)
  end
end