Class: Kafka::Consumer
- Inherits:
-
Object
- Object
- Kafka::Consumer
- Defined in:
- lib/kafka/consumer.rb
Overview
This code is still alpha level. Don't use this for anything important. The API may also change without warning.
A client that consumes messages from a Kafka cluster in coordination with other clients.
A Consumer subscribes to one or more Kafka topics; all consumers with the same group id then agree on who should read from the individual topic partitions. When group members join or leave, the group synchronizes, making sure that all partitions are assigned to a single member, and that all members have some partitions to read from.
Example
A simple producer that simply writes the messages it consumes to the console.
require "kafka"
kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])
# Create a new Consumer instance in the group `my-group`:
consumer = kafka.consumer(group_id: "my-group")
# Subscribe to a Kafka topic:
consumer.subscribe("messages")
# Loop forever, reading in messages from all topics that have been
# subscribed to.
consumer. do ||
puts .topic
puts .partition
puts .key
puts .value
puts .offset
end
Instance Method Summary collapse
- #commit_offsets ⇒ Object
-
#each_batch(min_bytes: 1, max_wait_time: 5, automatically_mark_as_processed: true) {|batch| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
-
#each_message(min_bytes: 1, max_wait_time: 5, automatically_mark_as_processed: true) {|message| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
-
#initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:) ⇒ Consumer
constructor
A new instance of Consumer.
- #mark_message_as_processed(message) ⇒ Object
-
#pause(topic, partition, timeout: nil) ⇒ nil
Pause processing of a specific topic partition.
-
#paused?(topic, partition) ⇒ Boolean
Whether the topic partition is currently paused.
-
#resume(topic, partition) ⇒ nil
Resume processing of a topic partition.
- #send_heartbeat_if_necessary ⇒ Object
- #stop ⇒ Object
-
#subscribe(topic, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) ⇒ nil
Subscribes the consumer to a topic.
Constructor Details
#initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:) ⇒ Consumer
Returns a new instance of Consumer.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/kafka/consumer.rb', line 46 def initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:) @cluster = cluster @logger = logger @instrumenter = instrumenter @group = group @offset_manager = offset_manager @session_timeout = session_timeout @heartbeat = heartbeat # A list of partitions that have been paused, per topic. @paused_partitions = {} # Whether or not the consumer is currently consuming messages. @running = false # The maximum number of bytes to fetch from a single partition, by topic. @max_bytes = {} end |
Instance Method Details
#commit_offsets ⇒ Object
283 284 285 |
# File 'lib/kafka/consumer.rb', line 283 def commit_offsets @offset_manager.commit_offsets end |
#each_batch(min_bytes: 1, max_wait_time: 5, automatically_mark_as_processed: true) {|batch| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
Each batch of messages is yielded to the provided block. If the block returns without raising an exception, the batch will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message batch in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/kafka/consumer.rb', line 243 def each_batch(min_bytes: 1, max_wait_time: 5, automatically_mark_as_processed: true) consumer_loop do batches = fetch_batches(min_bytes: min_bytes, max_wait_time: max_wait_time) batches.each do |batch| unless batch.empty? @instrumenter.instrument("process_batch.consumer") do |notification| notification.update( topic: batch.topic, partition: batch.partition, offset_lag: batch.offset_lag, highwater_mark_offset: batch.highwater_mark_offset, message_count: batch..count, ) begin yield batch rescue => e offset_range = (batch.first_offset .. batch.last_offset) location = "#{batch.topic}/#{batch.partition} in offset range #{offset_range}" backtrace = e.backtrace.join("\n") @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}" raise ProcessingError.new(batch.topic, batch.partition, offset_range) end end (batch..last) if automatically_mark_as_processed end @offset_manager.commit_offsets_if_necessary @heartbeat.send_if_necessary return if !@running end end end |
#each_message(min_bytes: 1, max_wait_time: 5, automatically_mark_as_processed: true) {|message| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
Each message is yielded to the provided block. If the block returns without raising an exception, the message will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/kafka/consumer.rb', line 179 def (min_bytes: 1, max_wait_time: 5, automatically_mark_as_processed: true) consumer_loop do batches = fetch_batches(min_bytes: min_bytes, max_wait_time: max_wait_time) batches.each do |batch| batch..each do || @instrumenter.instrument("process_message.consumer") do |notification| notification.update( topic: .topic, partition: .partition, offset: .offset, offset_lag: batch.highwater_mark_offset - .offset - 1, key: .key, value: .value, ) begin yield rescue => e location = "#{.topic}/#{.partition} at offset #{.offset}" backtrace = e.backtrace.join("\n") @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}" raise ProcessingError.new(.topic, .partition, .offset) end end () if automatically_mark_as_processed @offset_manager.commit_offsets_if_necessary @heartbeat.send_if_necessary return if !@running end end # We may not have received any messages, but it's still a good idea to # commit offsets if we've processed messages in the last set of batches. @offset_manager.commit_offsets_if_necessary end end |
#mark_message_as_processed(message) ⇒ Object
287 288 289 |
# File 'lib/kafka/consumer.rb', line 287 def () @offset_manager.mark_as_processed(.topic, .partition, .offset) end |
#pause(topic, partition, timeout: nil) ⇒ nil
Pause processing of a specific topic partition.
When a specific message causes the processor code to fail, it can be a good idea to simply pause the partition until the error can be resolved, allowing the rest of the partitions to continue being processed.
If the timeout
argument is passed, the partition will automatically be
resumed when the timeout expires.
111 112 113 114 |
# File 'lib/kafka/consumer.rb', line 111 def pause(topic, partition, timeout: nil) @paused_partitions[topic] ||= {} @paused_partitions[topic][partition] = timeout && Time.now + timeout end |
#paused?(topic, partition) ⇒ Boolean
Whether the topic partition is currently paused.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/kafka/consumer.rb', line 133 def paused?(topic, partition) partitions = @paused_partitions.fetch(topic, {}) if partitions.key?(partition) # Users can set an optional timeout, after which the partition is # automatically resumed. When pausing, the timeout is translated to an # absolute point in time. timeout = partitions.fetch(partition) if timeout.nil? true elsif Time.now < timeout true else @logger.info "Automatically resuming partition #{topic}/#{partition}, pause timeout expired" resume(topic, partition) false end end end |
#resume(topic, partition) ⇒ nil
Resume processing of a topic partition.
122 123 124 125 |
# File 'lib/kafka/consumer.rb', line 122 def resume(topic, partition) paused_partitions = @paused_partitions.fetch(topic, {}) paused_partitions.delete(partition) end |
#send_heartbeat_if_necessary ⇒ Object
291 292 293 |
# File 'lib/kafka/consumer.rb', line 291 def send_heartbeat_if_necessary @heartbeat.send_if_necessary end |
#stop ⇒ Object
93 94 95 |
# File 'lib/kafka/consumer.rb', line 93 def stop @running = false end |
#subscribe(topic, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) ⇒ nil
Subscribes the consumer to a topic.
Typically you either want to start reading messages from the very
beginning of the topic's partitions or you simply want to wait for new
messages to be written. In the former case, set start_from_beginning
to true (the default); in the latter, set it to false.
83 84 85 86 87 88 89 90 91 |
# File 'lib/kafka/consumer.rb', line 83 def subscribe(topic, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) default_offset ||= start_from_beginning ? :earliest : :latest @group.subscribe(topic) @offset_manager.set_default_offset(topic, default_offset) @max_bytes[topic] = max_bytes_per_partition nil end |