Class: Rdkafka::Consumer
- Inherits:
-
Object
- Object
- Rdkafka::Consumer
- Includes:
- Enumerable
- Defined in:
- lib/rdkafka/consumer.rb,
lib/rdkafka/consumer/message.rb,
lib/rdkafka/consumer/partition.rb,
lib/rdkafka/consumer/topic_partition_list.rb
Overview
A consumer of Kafka messages. It uses the high-level consumer approach where the Kafka
brokers automatically assign partitions and load balance partitions over consumers that
have the same :"group.id"
set in their configuration.
To create a consumer set up a Config and call consumer on that. It is
mandatory to set :"group.id"
in the configuration.
Defined Under Namespace
Classes: Message, Partition, TopicPartitionList
Instance Method Summary collapse
-
#close ⇒ nil
Close this consumer.
-
#commit(async = false) ⇒ nil
Commit the current offsets of this consumer.
-
#committed(list, timeout_ms = 200) ⇒ TopicPartitionList
Return the current committed offset per partition for this consumer group.
-
#each {|message| ... } ⇒ nil
Poll for new messages and yield for each received one.
-
#lag(topic_partition_list, watermark_timeout_ms = 100) ⇒ Hash<String, Hash<Integer, Integer>>
Calculate the consumer lag per partition for the provided topic partition list.
-
#poll(timeout_ms) ⇒ Message?
Poll for the next message on one of the subscribed topics.
-
#query_watermark_offsets(topic, partition, timeout_ms = 200) ⇒ Integer
Query broker for low (oldest/beginning) and high (newest/end) offsets for a partition.
-
#subscribe(*topics) ⇒ nil
Subscribe to one or more topics letting Kafka handle partition assignments.
-
#subscription ⇒ TopicPartitionList
Return the current subscription to topics and partitions.
-
#unsubscribe ⇒ nil
Unsubscribe from all subscribed topics.
Instance Method Details
#close ⇒ nil
Close this consumer
18 19 20 |
# File 'lib/rdkafka/consumer.rb', line 18 def close Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka) end |
#commit(async = false) ⇒ nil
Commit the current offsets of this consumer
163 164 165 166 167 168 |
# File 'lib/rdkafka/consumer.rb', line 163 def commit(async=false) response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka, nil, async) if response != 0 raise Rdkafka::RdkafkaError.new(response) end end |
#committed(list, timeout_ms = 200) ⇒ TopicPartitionList
Return the current committed offset per partition for this consumer group. The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.
TODO: This should use the subscription or assignment by default.
86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/rdkafka/consumer.rb', line 86 def committed(list, timeout_ms=200) unless list.is_a?(TopicPartitionList) raise TypeError.new("list has to be a TopicPartitionList") end tpl = list.copy_tpl response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka, tpl, timeout_ms) if response != 0 raise Rdkafka::RdkafkaError.new(response) end Rdkafka::Consumer::TopicPartitionList.new(tpl) end |
#each {|message| ... } ⇒ nil
Poll for new messages and yield for each received one
205 206 207 208 209 210 211 212 213 214 |
# File 'lib/rdkafka/consumer.rb', line 205 def each(&block) loop do = poll(250) if block.call() else next end end end |
#lag(topic_partition_list, watermark_timeout_ms = 100) ⇒ Hash<String, Hash<Integer, Integer>>
Calculate the consumer lag per partition for the provided topic partition list. You can get a suitable list by calling #committed or position (TODO). It is also possible to create one yourself, in this case you have to provide a list that already contains all the partitions you need the lag for.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/rdkafka/consumer.rb', line 137 def lag(topic_partition_list, watermark_timeout_ms=100) out = {} topic_partition_list.to_h.each do |topic, partitions| # Query high watermarks for this topic's partitions # and compare to the offset in the list. topic_out = {} partitions.each do |p| low, high = query_watermark_offsets( topic, p.partition, watermark_timeout_ms ) topic_out[p.partition] = high - p.offset end out[topic] = topic_out end out end |
#poll(timeout_ms) ⇒ Message?
Poll for the next message on one of the subscribed topics
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/rdkafka/consumer.rb', line 177 def poll(timeout_ms) = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka, timeout_ms) if .null? nil else # Create struct wrapper = Rdkafka::Bindings::Message.new() # Raise error if needed if [:err] != 0 raise Rdkafka::RdkafkaError.new([:err]) end # Create a message to pass out Rdkafka::Consumer::Message.new() end ensure # Clean up rdkafka message if there is one if !.nil? && !.null? Rdkafka::Bindings.() end end |
#query_watermark_offsets(topic, partition, timeout_ms = 200) ⇒ Integer
Query broker for low (oldest/beginning) and high (newest/end) offsets for a partition.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/rdkafka/consumer.rb', line 107 def query_watermark_offsets(topic, partition, timeout_ms=200) low = FFI::MemoryPointer.new(:int64, 1) high = FFI::MemoryPointer.new(:int64, 1) response = Rdkafka::Bindings.rd_kafka_query_watermark_offsets( @native_kafka, topic, partition, low, high, timeout_ms ) if response != 0 raise Rdkafka::RdkafkaError.new(response, "Error querying watermark offsets for partition #{partition} of #{topic}") end return low.read_int64, high.read_int64 end |
#subscribe(*topics) ⇒ nil
Subscribe to one or more topics letting Kafka handle partition assignments.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/rdkafka/consumer.rb', line 29 def subscribe(*topics) # Create topic partition list with topics and no partition set tpl = Rdkafka::Bindings.rd_kafka_topic_partition_list_new(topics.length) topics.each do |topic| Rdkafka::Bindings.rd_kafka_topic_partition_list_add( tpl, topic, -1 ) end # Subscribe to topic partition list and check this was successful response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka, tpl) if response != 0 raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(', ')}'") end ensure # Clean up the topic partition list Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) end |
#subscription ⇒ TopicPartitionList
Return the current subscription to topics and partitions
66 67 68 69 70 71 72 73 |
# File 'lib/rdkafka/consumer.rb', line 66 def subscription tpl = FFI::MemoryPointer.new(:pointer) response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka, tpl) if response != 0 raise Rdkafka::RdkafkaError.new(response) end Rdkafka::Consumer::TopicPartitionList.new(tpl.get_pointer(0)) end |
#unsubscribe ⇒ nil
Unsubscribe from all subscribed topics.
54 55 56 57 58 59 |
# File 'lib/rdkafka/consumer.rb', line 54 def unsubscribe response = Rdkafka::Bindings.rd_kafka_unsubscribe(@native_kafka) if response != 0 raise Rdkafka::RdkafkaError.new(response) end end |