Class: Karafka::Admin::Topics
- Inherits:
-
Karafka::Admin
- Object
- Karafka::Admin
- Karafka::Admin::Topics
- Defined in:
- lib/karafka/admin/topics.rb
Overview
Topic administration operations Provides methods to manage Kafka topics including creation, deletion, reading, and introspection
Class Method Summary collapse
-
.create(name, partitions, replication_factor, topic_config = {}) ⇒ void
Creates Kafka topic with given settings.
-
.create_partitions(name, partitions) ⇒ void
Creates more partitions for a given topic.
-
.delete(name) ⇒ void
Deleted a given topic.
-
.info(topic_name) ⇒ Hash
Returns basic topic metadata.
-
.read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>
Allows us to read messages from the topic.
-
.read_watermark_offsets(name, partition) ⇒ Array<Integer, Integer>
Fetches the watermark offsets for a given topic partition.
Methods inherited from Karafka::Admin
cluster_info, copy_consumer_group, create_topic, delete_consumer_group, delete_topic, read_lags_with_offsets, read_topic, rename_consumer_group, seek_consumer_group, topic_info, with_admin, with_consumer
Class Method Details
.create(name, partitions, replication_factor, topic_config = {}) ⇒ void
This method returns an undefined value.
Creates Kafka topic with given settings
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/karafka/admin/topics.rb', line 98 def create(name, partitions, replication_factor, topic_config = {}) with_admin do |admin| handler = admin.create_topic(name, partitions, replication_factor, topic_config) with_re_wait( -> { handler.wait(max_wait_timeout: max_wait_time_seconds) }, -> { names.include?(name) } ) end end |
.create_partitions(name, partitions) ⇒ void
This method returns an undefined value.
Creates more partitions for a given topic
131 132 133 134 135 136 137 138 139 140 |
# File 'lib/karafka/admin/topics.rb', line 131 def create_partitions(name, partitions) with_admin do |admin| handler = admin.create_partitions(name, partitions) with_re_wait( -> { handler.wait(max_wait_timeout: max_wait_time_seconds) }, -> { info(name).fetch(:partition_count) >= partitions } ) end end |
.delete(name) ⇒ void
This method returns an undefined value.
Deleted a given topic
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/karafka/admin/topics.rb', line 114 def delete(name) with_admin do |admin| handler = admin.delete_topic(name) with_re_wait( -> { handler.wait(max_wait_timeout: max_wait_time_seconds) }, -> { !names.include?(name) } ) end end |
.info(topic_name) ⇒ Hash
This query is much more efficient than doing a full ‘#cluster_info` + topic lookup because it does not have to query for all the topics data but just the topic we’re interested in
Returns basic topic metadata
162 163 164 165 166 167 168 169 |
# File 'lib/karafka/admin/topics.rb', line 162 def info(topic_name) with_admin do |admin| admin .(topic_name) .topics .find { |topic| topic[:topic_name] == topic_name } end end |
.read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>
Allows us to read messages from the topic
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/karafka/admin/topics.rb', line 21 def read(name, partition, count, start_offset = -1, settings = {}) = [] tpl = Rdkafka::Consumer::TopicPartitionList.new low_offset, high_offset = nil with_consumer(settings) do |consumer| # Convert the time offset (if needed) start_offset = resolve_offset(consumer, name.to_s, partition, start_offset) low_offset, high_offset = consumer.query_watermark_offsets(name, partition) # Select offset dynamically if -1 or less and move backwards with the negative # offset, allowing to start from N messages back from high-watermark start_offset = high_offset - count - start_offset.abs + 1 if start_offset.negative? start_offset = low_offset if start_offset.negative? # Build the requested range - since first element is on the start offset we need to # subtract one from requested count to end up with expected number of elements requested_range = (start_offset..start_offset + (count - 1)) # Establish theoretical available range. Note, that this does not handle cases related # to log retention or compaction available_range = (low_offset..(high_offset - 1)) # Select only offset that we can select. This will remove all the potential offsets # that are below the low watermark offset possible_range = requested_range.select { |offset| available_range.include?(offset) } start_offset = possible_range.first count = possible_range.size tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset) consumer.assign(tpl) # We should poll as long as we don't have all the messages that we need or as long as # we do not read all the messages from the topic loop do # If we've got as many messages as we've wanted stop break if .size >= count = consumer.poll(200) next unless # If the message we've got is beyond the requested range, stop break unless possible_range.include?(.offset) << rescue Rdkafka::RdkafkaError => e # End of partition break if e.code == :partition_eof raise e end end # Use topic from routes if we can match it or create a dummy one # Dummy one is used in case we cannot match the topic with routes. This can happen # when admin API is used to read topics that are not part of the routing topic = ::Karafka::Routing::Router.find_or_initialize_by_name(name) .map! do || Messages::Builders::Message.call( , topic, Time.now ) end end |
.read_watermark_offsets(name, partition) ⇒ Array<Integer, Integer>
Fetches the watermark offsets for a given topic partition
147 148 149 150 151 |
# File 'lib/karafka/admin/topics.rb', line 147 def read_watermark_offsets(name, partition) with_consumer do |consumer| consumer.query_watermark_offsets(name, partition) end end |