Class: Kafka::Consumer
- Inherits:
-
Object
- Object
- Kafka::Consumer
- Defined in:
- lib/kafka/consumer.rb
Overview
This code is still alpha level. Don’t use this for anything important. The API may also change without warning.
A client that consumes messages from a Kafka cluster in coordination with other clients.
A Consumer subscribes to one or more Kafka topics; all consumers with the same group id then agree on who should read from the individual topic partitions. When group members join or leave, the group synchronizes, making sure that all partitions are assigned to a single member, and that all members have some partitions to read from.
Example
A simple producer that simply writes the messages it consumes to the console.
require "kafka"
kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])
# Create a new Consumer instance in the group `my-group`:
consumer = kafka.consumer(group_id: "my-group")
# Subscribe to a Kafka topic:
consumer.subscribe("messages")
# Loop forever, reading in messages from all topics that have been
# subscribed to.
consumer. do ||
puts .topic
puts .partition
puts .key
puts .value
puts .offset
end
Instance Method Summary collapse
-
#each_batch(min_bytes: 1, max_wait_time: 5) {|batch| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
-
#each_message(min_bytes: 1, max_wait_time: 5) {|message| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
-
#initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:) ⇒ Consumer
constructor
A new instance of Consumer.
-
#stop ⇒ Object
-
#subscribe(topic, default_offset: nil, start_from_beginning: true) ⇒ nil
Subscribes the consumer to a topic.
Constructor Details
#initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:) ⇒ Consumer
Returns a new instance of Consumer.
46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/kafka/consumer.rb', line 46 def initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:) @cluster = cluster @logger = logger @instrumenter = instrumenter @group = group @offset_manager = offset_manager @session_timeout = session_timeout @heartbeat = heartbeat # Whether or not the consumer is currently consuming messages. @running = false end |
Instance Method Details
#each_batch(min_bytes: 1, max_wait_time: 5) {|batch| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
Each batch of messages is yielded to the provided block. If the block returns without raising an exception, the batch will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message batch in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/kafka/consumer.rb', line 154 def each_batch(min_bytes: 1, max_wait_time: 5) consumer_loop do batches = fetch_batches(min_bytes: min_bytes, max_wait_time: max_wait_time) batches.each do |batch| unless batch.empty? @instrumenter.instrument("process_batch.consumer") do |notification| notification.update( topic: batch.topic, partition: batch.partition, offset_lag: batch.offset_lag, highwater_mark_offset: batch.highwater_mark_offset, message_count: batch..count, ) yield batch end (batch..last) end @offset_manager.commit_offsets_if_necessary @heartbeat.send_if_necessary return if !@running end end end |
#each_message(min_bytes: 1, max_wait_time: 5) {|message| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
Each message is yielded to the provided block. If the block returns without raising an exception, the message will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/kafka/consumer.rb', line 106 def (min_bytes: 1, max_wait_time: 5) consumer_loop do batches = fetch_batches(min_bytes: min_bytes, max_wait_time: max_wait_time) batches.each do |batch| batch..each do || @instrumenter.instrument("process_message.consumer") do |notification| notification.update( topic: .topic, partition: .partition, offset: .offset, offset_lag: batch.highwater_mark_offset - .offset, key: .key, value: .value, ) yield end () @offset_manager.commit_offsets_if_necessary @heartbeat.send_if_necessary return if !@running end end end end |
#stop ⇒ Object
84 85 86 |
# File 'lib/kafka/consumer.rb', line 84 def stop @running = false end |
#subscribe(topic, default_offset: nil, start_from_beginning: true) ⇒ nil
Subscribes the consumer to a topic.
Typically you either want to start reading messages from the very
beginning of the topic’s partitions or you simply want to wait for new
messages to be written. In the former case, set start_from_beginnign
true (the default); in the latter, set it to false.
75 76 77 78 79 80 81 82 |
# File 'lib/kafka/consumer.rb', line 75 def subscribe(topic, default_offset: nil, start_from_beginning: true) default_offset ||= start_from_beginning ? :earliest : :latest @group.subscribe(topic) @offset_manager.set_default_offset(topic, default_offset) nil end |