Class: Kafka::Consumer
- Inherits:
-
Object
- Object
- Kafka::Consumer
- Defined in:
- lib/kafka/consumer.rb
Overview
This code is still alpha level. Don't use this for anything important. The API may also change without warning.
A client that consumes messages from a Kafka cluster in coordination with other clients.
A Consumer subscribes to one or more Kafka topics; all consumers with the same group id then agree on who should read from the individual topic partitions. When group members join or leave, the group synchronizes, making sure that all partitions are assigned to a single member, and that all members have some partitions to read from.
Example
A simple producer that simply writes the messages it consumes to the console.
require "kafka"
kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])
# Create a new Consumer instance in the group `my-group`:
consumer = kafka.consumer(group_id: "my-group")
# Subscribe to a Kafka topic:
consumer.subscribe("messages")
begin
# Loop forever, reading in messages from all topics that have been
# subscribed to.
consumer. do ||
puts .topic
puts .partition
puts .key
puts .value
puts .offset
end
ensure
# Make sure to shut down the consumer after use. This lets
# the consumer notify the Kafka cluster that it's leaving
# the group, causing a synchronization and re-balancing of
# the group.
consumer.shutdown
end
Instance Method Summary collapse
-
#each_message {|message| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
-
#initialize(cluster:, logger:, group:, offset_manager:, session_timeout:) ⇒ Consumer
constructor
A new instance of Consumer.
-
#shutdown ⇒ nil
Shuts down the consumer.
-
#subscribe(topic, default_offset: :earliest) ⇒ nil
Subscribes the consumer to a topic.
Constructor Details
#initialize(cluster:, logger:, group:, offset_manager:, session_timeout:) ⇒ Consumer
Returns a new instance of Consumer.
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/kafka/consumer.rb', line 54 def initialize(cluster:, logger:, group:, offset_manager:, session_timeout:) @cluster = cluster @logger = logger @group = group @offset_manager = offset_manager @session_timeout = session_timeout # Send two heartbeats in each session window, just to be sure. @heartbeat_interval = @session_timeout / 2 end |
Instance Method Details
#each_message {|message| ... } ⇒ nil
Fetches and enumerates the messages in the topics that the consumer group subscribes to.
Each message is yielded to the provided block. If the block returns without raising an exception, the message will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/kafka/consumer.rb', line 96 def loop do begin batch = fetch_batch batch.each do || Instrumentation.instrument("process_message.consumer.kafka") do |notification| notification.update( topic: .topic, partition: .partition, offset: .offset, key: .key, value: .value, ) yield end @offset_manager.commit_offsets_if_necessary send_heartbeat_if_necessary () end rescue ConnectionError => e @logger.error "Connection error while sending heartbeat; rejoining" join_group rescue UnknownMemberId @logger.error "Kicked out of group; rejoining" join_group rescue RebalanceInProgress @logger.error "Group is rebalancing; rejoining" join_group rescue IllegalGeneration @logger.error "Group has transitioned to a new generation; rejoining" join_group end end end |
#shutdown ⇒ nil
Shuts down the consumer.
In order to quickly have the consumer group re-balance itself, it's
important that members explicitly tell Kafka when they're leaving.
Therefore it's a good idea to call this method whenever your consumer
is about to quit. If this method is not called, it may take up to
the amount of time defined by the session_timeout parameter for
Kafka to realize that this consumer is no longer present and trigger
a group re-balance. In that period of time, the partitions that used
to be assigned to this consumer won't be processed.
147 148 149 150 151 |
# File 'lib/kafka/consumer.rb', line 147 def shutdown @offset_manager.commit_offsets @group.leave rescue ConnectionError end |
#subscribe(topic, default_offset: :earliest) ⇒ nil
Subscribes the consumer to a topic.
Typically you either want to start reading messages from the very
beginning of the topic's partitions or you simply want to wait for new
messages to be written. In the former case, set default_offsets to
:earliest (the default); in the latter, set it to :latest.
76 77 78 79 80 81 |
# File 'lib/kafka/consumer.rb', line 76 def subscribe(topic, default_offset: :earliest) @group.subscribe(topic) @offset_manager.set_default_offset(topic, default_offset) nil end |