Class: Kafka::Consumer
- Inherits:
-
Object
- Object
- Kafka::Consumer
- Defined in:
- lib/kafka/consumer.rb
Overview
This code is still alpha level. Don't use this for anything important. The API may also change without warning.
A client that consumes messages from a Kafka cluster in coordination with other clients.
A Consumer subscribes to one or more Kafka topics; all consumers with the same group id then agree on who should read from the individual topic partitions. When group members join or leave, the group synchronizes, making sure that all partitions are assigned to a single member, and that all members have some partitions to read from.
Example
A simple producer that simply writes the messages it consumes to the console.
require "kafka"
kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])
# Create a new Consumer instance in the group `my-group`:
consumer = kafka.consumer(group_id: "my-group")
# Subscribe to a Kafka topic:
consumer.subscribe("messages")
# Loop forever, reading in messages from all topics that have been
# subscribed to.
consumer. do ||
puts .topic
puts .partition
puts .key
puts .value
puts .offset
end
Instance Method Summary collapse
-
#each_batch(min_bytes: 1, max_wait_time: 5) {|batch| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
-
#each_message(min_bytes: 1, max_wait_time: 5) {|message| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
-
#initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:) ⇒ Consumer
constructor
A new instance of Consumer.
- #stop ⇒ Object
-
#subscribe(topic, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) ⇒ nil
Subscribes the consumer to a topic.
Constructor Details
#initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:) ⇒ Consumer
Returns a new instance of Consumer.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/kafka/consumer.rb', line 46 def initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:) @cluster = cluster @logger = logger @instrumenter = instrumenter @group = group @offset_manager = offset_manager @session_timeout = session_timeout @heartbeat = heartbeat # Whether or not the consumer is currently consuming messages. @running = false # The maximum number of bytes to fetch from a single partition, by topic. @max_bytes = {} end |
Instance Method Details
#each_batch(min_bytes: 1, max_wait_time: 5) {|batch| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
Each batch of messages is yielded to the provided block. If the block returns without raising an exception, the batch will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message batch in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/kafka/consumer.rb', line 172 def each_batch(min_bytes: 1, max_wait_time: 5) consumer_loop do batches = fetch_batches(min_bytes: min_bytes, max_wait_time: max_wait_time) batches.each do |batch| unless batch.empty? @instrumenter.instrument("process_batch.consumer") do |notification| notification.update( topic: batch.topic, partition: batch.partition, offset_lag: batch.offset_lag, highwater_mark_offset: batch.highwater_mark_offset, message_count: batch..count, ) begin yield batch rescue => e offset_range = batch.empty? ? "N/A" : [batch.first_offset, batch.last_offset].join("..") location = "#{batch.topic}/#{batch.partition} in offset range #{offset_range}" backtrace = e.backtrace.join("\n") @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}" raise end end (batch..last) end @offset_manager.commit_offsets_if_necessary @heartbeat.send_if_necessary return if !@running end end end |
#each_message(min_bytes: 1, max_wait_time: 5) {|message| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
Each message is yielded to the provided block. If the block returns without raising an exception, the message will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/kafka/consumer.rb', line 112 def (min_bytes: 1, max_wait_time: 5) consumer_loop do batches = fetch_batches(min_bytes: min_bytes, max_wait_time: max_wait_time) batches.each do |batch| batch..each do || @instrumenter.instrument("process_message.consumer") do |notification| notification.update( topic: .topic, partition: .partition, offset: .offset, offset_lag: batch.highwater_mark_offset - .offset, key: .key, value: .value, ) begin yield rescue => e location = "#{.topic}/#{.partition} at offset #{.offset}" backtrace = e.backtrace.join("\n") @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}" raise end end () @offset_manager.commit_offsets_if_necessary @heartbeat.send_if_necessary return if !@running end end # We may not have received any messages, but it's still a good idea to # commit offsets if we've processed messages in the last set of batches. @offset_manager.commit_offsets_if_necessary end end |
#stop ⇒ Object
90 91 92 |
# File 'lib/kafka/consumer.rb', line 90 def stop @running = false end |
#subscribe(topic, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) ⇒ nil
Subscribes the consumer to a topic.
Typically you either want to start reading messages from the very
beginning of the topic's partitions or you simply want to wait for new
messages to be written. In the former case, set start_from_beginnign
true (the default); in the latter, set it to false.
80 81 82 83 84 85 86 87 88 |
# File 'lib/kafka/consumer.rb', line 80 def subscribe(topic, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) default_offset ||= start_from_beginning ? :earliest : :latest @group.subscribe(topic) @offset_manager.set_default_offset(topic, default_offset) @max_bytes[topic] = max_bytes_per_partition nil end |