Class: Rdkafka::Admin
- Inherits:
-
Object
- Object
- Rdkafka::Admin
- Defined in:
- lib/rdkafka/admin.rb,
lib/rdkafka/admin/create_topic_handle.rb,
lib/rdkafka/admin/create_topic_report.rb,
lib/rdkafka/admin/delete_topic_handle.rb,
lib/rdkafka/admin/delete_topic_report.rb
Defined Under Namespace
Classes: CreateTopicHandle, CreateTopicReport, DeleteTopicHandle, DeleteTopicReport
Instance Method Summary collapse
-
#close ⇒ Object
Close this admin instance.
-
#create_topic(topic_name, partition_count, replication_factor, topic_config = {}) ⇒ CreateTopicHandle
Create a topic with the given partition count and replication factor.
-
#delete_topic(topic_name) ⇒ DeleteTopicHandle
Delete the named topic.
Instance Method Details
#close ⇒ Object
Close this admin instance
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/rdkafka/admin.rb', line 22 def close return unless @native_kafka # Indicate to polling thread that we're closing @closing = true # Wait for the polling thread to finish up @polling_thread.join Rdkafka::Bindings.rd_kafka_destroy(@native_kafka) @native_kafka = nil end |
#create_topic(topic_name, partition_count, replication_factor, topic_config = {}) ⇒ CreateTopicHandle
Create a topic with the given partition count and replication factor
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/rdkafka/admin.rb', line 40 def create_topic(topic_name, partition_count, replication_factor, topic_config={}) # Create a rd_kafka_NewTopic_t representing the new topic error_buffer = FFI::MemoryPointer.from_string(" " * 256) new_topic_ptr = Rdkafka::Bindings.rd_kafka_NewTopic_new( FFI::MemoryPointer.from_string(topic_name), partition_count, replication_factor, error_buffer, 256 ) if new_topic_ptr.null? raise Rdkafka::Config::ConfigError.new(error_buffer.read_string) end unless topic_config.nil? topic_config.each do |key, value| Rdkafka::Bindings.rd_kafka_NewTopic_set_config( new_topic_ptr, key.to_s, value.to_s ) end end # Note that rd_kafka_CreateTopics can create more than one topic at a time pointer_array = [new_topic_ptr] topics_array_ptr = FFI::MemoryPointer.new(:pointer) topics_array_ptr.write_array_of_pointer(pointer_array) # Get a pointer to the queue that our request will be enqueued on queue_ptr = Rdkafka::Bindings.rd_kafka_queue_get_background(@native_kafka) if queue_ptr.null? Rdkafka::Bindings.rd_kafka_NewTopic_destroy(new_topic_ptr) raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL") end # Create and register the handle we will return to the caller create_topic_handle = CreateTopicHandle.new create_topic_handle[:pending] = true create_topic_handle[:response] = -1 CreateTopicHandle.register(create_topic_handle) = Rdkafka::Bindings.rd_kafka_AdminOptions_new(@native_kafka, Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATETOPICS) Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(, create_topic_handle.to_ptr) begin Rdkafka::Bindings.rd_kafka_CreateTopics( @native_kafka, topics_array_ptr, 1, , queue_ptr ) rescue Exception CreateTopicHandle.remove(create_topic_handle.to_ptr.address) raise ensure Rdkafka::Bindings.rd_kafka_AdminOptions_destroy() Rdkafka::Bindings.rd_kafka_queue_destroy(queue_ptr) Rdkafka::Bindings.rd_kafka_NewTopic_destroy(new_topic_ptr) end create_topic_handle end |
#delete_topic(topic_name) ⇒ DeleteTopicHandle
Delete the named topic
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/rdkafka/admin.rb', line 110 def delete_topic(topic_name) # Create a rd_kafka_DeleteTopic_t representing the topic to be deleted delete_topic_ptr = Rdkafka::Bindings.rd_kafka_DeleteTopic_new(FFI::MemoryPointer.from_string(topic_name)) # Note that rd_kafka_DeleteTopics can create more than one topic at a time pointer_array = [delete_topic_ptr] topics_array_ptr = FFI::MemoryPointer.new(:pointer) topics_array_ptr.write_array_of_pointer(pointer_array) # Get a pointer to the queue that our request will be enqueued on queue_ptr = Rdkafka::Bindings.rd_kafka_queue_get_background(@native_kafka) if queue_ptr.null? Rdkafka::Bindings.rd_kafka_DeleteTopic_destroy(delete_topic_ptr) raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL") end # Create and register the handle we will return to the caller delete_topic_handle = DeleteTopicHandle.new delete_topic_handle[:pending] = true delete_topic_handle[:response] = -1 DeleteTopicHandle.register(delete_topic_handle) = Rdkafka::Bindings.rd_kafka_AdminOptions_new(@native_kafka, Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_DELETETOPICS) Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(, delete_topic_handle.to_ptr) begin Rdkafka::Bindings.rd_kafka_DeleteTopics( @native_kafka, topics_array_ptr, 1, , queue_ptr ) rescue Exception DeleteTopicHandle.remove(delete_topic_handle.to_ptr.address) raise ensure Rdkafka::Bindings.rd_kafka_AdminOptions_destroy() Rdkafka::Bindings.rd_kafka_queue_destroy(queue_ptr) Rdkafka::Bindings.rd_kafka_DeleteTopic_destroy(delete_topic_ptr) end delete_topic_handle end |