Class: Kafka::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/consumer.rb

Overview

Note:

This code is still alpha level. Don't use this for anything important. The API may also change without warning.

A client that consumes messages from a Kafka cluster in coordination with other clients.

A Consumer subscribes to one or more Kafka topics; all consumers with the same group id then agree on who should read from the individual topic partitions. When group members join or leave, the group synchronizes, making sure that all partitions are assigned to a single member, and that all members have some partitions to read from.

Example

A simple producer that simply writes the messages it consumes to the console.

require "kafka"

kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])

# Create a new Consumer instance in the group `my-group`:
consumer = kafka.consumer(group_id: "my-group")

# Subscribe to a Kafka topic:
consumer.subscribe("messages")

begin
  # Loop forever, reading in messages from all topics that have been
  # subscribed to.
  consumer.each_message do |message|
    puts message.topic
    puts message.partition
    puts message.key
    puts message.value
    puts message.offset
  end
ensure
  # Make sure to shut down the consumer after use. This lets
  # the consumer notify the Kafka cluster that it's leaving
  # the group, causing a synchronization and re-balancing of
  # the group.
  consumer.shutdown
end

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, logger:, group:, offset_manager:, session_timeout:) ⇒ Consumer

Returns a new instance of Consumer.



54
55
56
57
58
59
60
61
62
63
# File 'lib/kafka/consumer.rb', line 54

def initialize(cluster:, logger:, group:, offset_manager:, session_timeout:)
  @cluster = cluster
  @logger = logger
  @group = group
  @offset_manager = offset_manager
  @session_timeout = session_timeout

  # Send two heartbeats in each session window, just to be sure.
  @heartbeat_interval = @session_timeout / 2
end

Instance Method Details

#each_message {|message| ... } ⇒ nil

Fetches and enumerates the messages in the topics that the consumer group subscribes to.

Each message is yielded to the provided block. If the block returns without raising an exception, the message will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.

Yield Parameters:

Returns:

  • (nil)


96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/kafka/consumer.rb', line 96

def each_message
  loop do
    begin
      batch = fetch_batch

      batch.each do |message|
        Instrumentation.instrument("process_message.consumer.kafka") do |notification|
          notification.update(
            topic: message.topic,
            partition: message.partition,
            offset: message.offset,
            key: message.key,
            value: message.value,
          )

          yield message
        end

        @offset_manager.commit_offsets_if_necessary

        send_heartbeat_if_necessary
        mark_message_as_processed(message)
      end
    rescue ConnectionError => e
      @logger.error "Connection error while sending heartbeat; rejoining"
      join_group
    rescue UnknownMemberId
      @logger.error "Kicked out of group; rejoining"
      join_group
    rescue RebalanceInProgress
      @logger.error "Group is rebalancing; rejoining"
      join_group
    rescue IllegalGeneration
      @logger.error "Group has transitioned to a new generation; rejoining"
      join_group
    end
  end
end

#shutdownnil

Shuts down the consumer.

In order to quickly have the consumer group re-balance itself, it's important that members explicitly tell Kafka when they're leaving. Therefore it's a good idea to call this method whenever your consumer is about to quit. If this method is not called, it may take up to the amount of time defined by the session_timeout parameter for Kafka to realize that this consumer is no longer present and trigger a group re-balance. In that period of time, the partitions that used to be assigned to this consumer won't be processed.

Returns:

  • (nil)


147
148
149
150
151
# File 'lib/kafka/consumer.rb', line 147

def shutdown
  @offset_manager.commit_offsets
  @group.leave
rescue ConnectionError
end

#subscribe(topic, default_offset: :earliest) ⇒ nil

Subscribes the consumer to a topic.

Typically you either want to start reading messages from the very beginning of the topic's partitions or you simply want to wait for new messages to be written. In the former case, set default_offsets to :earliest (the default); in the latter, set it to :latest.

Parameters:

  • topic (String)

    the name of the topic to subscribe to.

  • default_offset (Symbol) (defaults to: :earliest)

    whether to start from the beginning or the end of the topic's partitions.

Returns:

  • (nil)


76
77
78
79
80
81
# File 'lib/kafka/consumer.rb', line 76

def subscribe(topic, default_offset: :earliest)
  @group.subscribe(topic)
  @offset_manager.set_default_offset(topic, default_offset)

  nil
end