Class: Kafka::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/consumer.rb

Overview

Note:

This code is still alpha level. Don’t use this for anything important. The API may also change without warning.

A client that consumes messages from a Kafka cluster in coordination with other clients.

A Consumer subscribes to one or more Kafka topics; all consumers with the same group id then agree on who should read from the individual topic partitions. When group members join or leave, the group synchronizes, making sure that all partitions are assigned to a single member, and that all members have some partitions to read from.

Example

A simple producer that simply writes the messages it consumes to the console.

require "kafka"

kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])

# Create a new Consumer instance in the group `my-group`:
consumer = kafka.consumer(group_id: "my-group")

# Subscribe to a Kafka topic:
consumer.subscribe("messages")

# Loop forever, reading in messages from all topics that have been
# subscribed to.
consumer.each_message do |message|
  puts message.topic
  puts message.partition
  puts message.key
  puts message.value
  puts message.offset
end

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:) ⇒ Consumer

Returns a new instance of Consumer.



46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/kafka/consumer.rb', line 46

def initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:)
  @cluster = cluster
  @logger = logger
  @instrumenter = instrumenter
  @group = group
  @offset_manager = offset_manager
  @session_timeout = session_timeout
  @heartbeat = heartbeat

  # Whether or not the consumer is currently consuming messages.
  @running = false
end

Instance Method Details

#each_batch(min_bytes: 1, max_wait_time: 5) {|batch| ... } ⇒ nil

Fetches and enumerates the messages in the topics that the consumer group subscribes to.

Each batch of messages is yielded to the provided block. If the block returns without raising an exception, the batch will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message batch in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.

Parameters:

  • min_bytes (Integer) (defaults to: 1)

    the minimum number of bytes to read before returning messages from the server; if max_wait_time is reached, this is ignored.

  • max_wait_time (Integer) (defaults to: 5)

    the maximum duration of time to wait before returning messages from the server, in seconds.

Yield Parameters:

Returns:

  • (nil)


154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/kafka/consumer.rb', line 154

def each_batch(min_bytes: 1, max_wait_time: 5)
  consumer_loop do
    batches = fetch_batches(min_bytes: min_bytes, max_wait_time: max_wait_time)

    batches.each do |batch|
      unless batch.empty?
        @instrumenter.instrument("process_batch.consumer") do |notification|
          notification.update(
            topic: batch.topic,
            partition: batch.partition,
            offset_lag: batch.offset_lag,
            highwater_mark_offset: batch.highwater_mark_offset,
            message_count: batch.messages.count,
          )

          yield batch
        end

        mark_message_as_processed(batch.messages.last)
      end

      @offset_manager.commit_offsets_if_necessary

      @heartbeat.send_if_necessary

      return if !@running
    end
  end
end

#each_message(min_bytes: 1, max_wait_time: 5) {|message| ... } ⇒ nil

Fetches and enumerates the messages in the topics that the consumer group subscribes to.

Each message is yielded to the provided block. If the block returns without raising an exception, the message will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.

Parameters:

  • min_bytes (Integer) (defaults to: 1)

    the minimum number of bytes to read before returning messages from the server; if max_wait_time is reached, this is ignored.

  • max_wait_time (Integer) (defaults to: 5)

    the maximum duration of time to wait before returning messages from the server, in seconds.

Yield Parameters:

Returns:

  • (nil)


106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/kafka/consumer.rb', line 106

def each_message(min_bytes: 1, max_wait_time: 5)
  consumer_loop do
    batches = fetch_batches(min_bytes: min_bytes, max_wait_time: max_wait_time)

    batches.each do |batch|
      batch.messages.each do |message|
        @instrumenter.instrument("process_message.consumer") do |notification|
          notification.update(
            topic: message.topic,
            partition: message.partition,
            offset: message.offset,
            offset_lag: batch.highwater_mark_offset - message.offset,
            key: message.key,
            value: message.value,
          )

          yield message
        end

        mark_message_as_processed(message)
        @offset_manager.commit_offsets_if_necessary

        @heartbeat.send_if_necessary

        return if !@running
      end
    end
  end
end

#stopObject



84
85
86
# File 'lib/kafka/consumer.rb', line 84

def stop
  @running = false
end

#subscribe(topic, default_offset: nil, start_from_beginning: true) ⇒ nil

Subscribes the consumer to a topic.

Typically you either want to start reading messages from the very beginning of the topic’s partitions or you simply want to wait for new messages to be written. In the former case, set start_from_beginnign true (the default); in the latter, set it to false.

Parameters:

  • topic (String)

    the name of the topic to subscribe to.

  • default_offset (Symbol) (defaults to: nil)

    whether to start from the beginning or the end of the topic’s partitions. Deprecated.

  • start_from_beginning (Boolean) (defaults to: true)

    whether to start from the beginning of the topic or just subscribe to new messages being produced. This only applies when first consuming a topic partition – once the consumer has checkpointed its progress, it will always resume from the last checkpoint.

Returns:

  • (nil)


75
76
77
78
79
80
81
82
# File 'lib/kafka/consumer.rb', line 75

def subscribe(topic, default_offset: nil, start_from_beginning: true)
  default_offset ||= start_from_beginning ? :earliest : :latest

  @group.subscribe(topic)
  @offset_manager.set_default_offset(topic, default_offset)

  nil
end