Class: Kafka::Consumer
- Inherits:
-
Object
- Object
- Kafka::Consumer
- Defined in:
- lib/kafka/consumer.rb
Overview
This code is still alpha level. Don't use this for anything important. The API may also change without warning.
A client that consumes messages from a Kafka cluster in coordination with other clients.
A Consumer subscribes to one or more Kafka topics; all consumers with the same group id then agree on who should read from the individual topic partitions. When group members join or leave, the group synchronizes, making sure that all partitions are assigned to a single member, and that all members have some partitions to read from.
Example
A simple producer that simply writes the messages it consumes to the console.
require "kafka"
kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])
# Create a new Consumer instance in the group `my-group`:
consumer = kafka.consumer(group_id: "my-group")
# Subscribe to a Kafka topic:
consumer.subscribe("messages")
# Loop forever, reading in messages from all topics that have been
# subscribed to.
consumer. do ||
puts .topic
puts .partition
puts .key
puts .value
puts .offset
end
Instance Method Summary collapse
- #each_batch ⇒ Object
-
#each_message {|message| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
-
#initialize(cluster:, logger:, group:, offset_manager:, session_timeout:) ⇒ Consumer
constructor
A new instance of Consumer.
- #stop ⇒ Object
-
#subscribe(topic, default_offset: :earliest) ⇒ nil
Subscribes the consumer to a topic.
Constructor Details
#initialize(cluster:, logger:, group:, offset_manager:, session_timeout:) ⇒ Consumer
Returns a new instance of Consumer.
46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/kafka/consumer.rb', line 46 def initialize(cluster:, logger:, group:, offset_manager:, session_timeout:) @cluster = cluster @logger = logger @group = group @offset_manager = offset_manager @session_timeout = session_timeout # Send two heartbeats in each session window, just to be sure. @heartbeat_interval = @session_timeout / 2 # Whether or not the consumer is currently consuming messages. @running = false end |
Instance Method Details
#each_batch ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/kafka/consumer.rb', line 145 def each_batch loop do begin fetch_batches.each do |batch| unless batch.empty? Instrumentation.instrument("process_batch.consumer.kafka") do |notification| notification.update( topic: batch.topic, partition: batch.partition, highwater_mark_offset: batch.highwater_mark_offset, message_count: batch..count, ) yield batch end (batch..last) end @offset_manager.commit_offsets_if_necessary send_heartbeat_if_necessary end rescue ConnectionError => e @logger.error "Connection error while sending heartbeat; rejoining" join_group rescue UnknownMemberId @logger.error "Kicked out of group; rejoining" join_group rescue RebalanceInProgress @logger.error "Group is rebalancing; rejoining" join_group rescue IllegalGeneration @logger.error "Group has transitioned to a new generation; rejoining" join_group end end ensure # In order to quickly have the consumer group re-balance itself, it's # important that members explicitly tell Kafka when they're leaving. @offset_manager.commit_offsets @group.leave end |
#each_message {|message| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
Each message is yielded to the provided block. If the block returns without raising an exception, the message will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/kafka/consumer.rb', line 91 def @running = true while @running begin fetch_batches.each do |batch| batch..each do || Instrumentation.instrument("process_message.consumer.kafka") do |notification| notification.update( topic: .topic, partition: .partition, offset: .offset, offset_lag: batch.highwater_mark_offset - .offset, key: .key, value: .value, ) yield end @offset_manager.commit_offsets_if_necessary send_heartbeat_if_necessary () break if !@running end end rescue ConnectionError => e @logger.error "Connection error while sending heartbeat; rejoining" join_group rescue UnknownMemberId @logger.error "Kicked out of group; rejoining" join_group rescue RebalanceInProgress @logger.error "Group is rebalancing; rejoining" join_group rescue IllegalGeneration @logger.error "Group has transitioned to a new generation; rejoining" join_group end end ensure # In order to quickly have the consumer group re-balance itself, it's # important that members explicitly tell Kafka when they're leaving. @offset_manager.commit_offsets @group.leave @running = false end |
#stop ⇒ Object
141 142 143 |
# File 'lib/kafka/consumer.rb', line 141 def stop @running = false end |
#subscribe(topic, default_offset: :earliest) ⇒ nil
Subscribes the consumer to a topic.
Typically you either want to start reading messages from the very
beginning of the topic's partitions or you simply want to wait for new
messages to be written. In the former case, set default_offsets to
:earliest (the default); in the latter, set it to :latest.
71 72 73 74 75 76 |
# File 'lib/kafka/consumer.rb', line 71 def subscribe(topic, default_offset: :earliest) @group.subscribe(topic) @offset_manager.set_default_offset(topic, default_offset) nil end |