Class: Kafka::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/consumer.rb

Overview

Note:

This code is still alpha level. Don't use this for anything important. The API may also change without warning.

A client that consumes messages from a Kafka cluster in coordination with other clients.

A Consumer subscribes to one or more Kafka topics; all consumers with the same group id then agree on who should read from the individual topic partitions. When group members join or leave, the group synchronizes, making sure that all partitions are assigned to a single member, and that all members have some partitions to read from.

Example

A simple producer that simply writes the messages it consumes to the console.

require "kafka"

kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])

# Create a new Consumer instance in the group `my-group`:
consumer = kafka.consumer(group_id: "my-group")

# Subscribe to a Kafka topic:
consumer.subscribe("messages")

# Loop forever, reading in messages from all topics that have been
# subscribed to.
consumer.each_message do |message|
  puts message.topic
  puts message.partition
  puts message.key
  puts message.value
  puts message.offset
end

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:) ⇒ Consumer

Returns a new instance of Consumer.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/kafka/consumer.rb', line 46

def initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:)
  @cluster = cluster
  @logger = logger
  @instrumenter = instrumenter
  @group = group
  @offset_manager = offset_manager
  @session_timeout = session_timeout
  @heartbeat = heartbeat

  # A list of partitions that have been paused, per topic.
  @paused_partitions = {}

  # Whether or not the consumer is currently consuming messages.
  @running = false

  # The maximum number of bytes to fetch from a single partition, by topic.
  @max_bytes = {}
end

Instance Method Details

#commit_offsetsObject



283
284
285
# File 'lib/kafka/consumer.rb', line 283

def commit_offsets
  @offset_manager.commit_offsets
end

#each_batch(min_bytes: 1, max_wait_time: 5, automatically_mark_as_processed: true) {|batch| ... } ⇒ nil

Fetches and enumerates the messages in the topics that the consumer group subscribes to.

Each batch of messages is yielded to the provided block. If the block returns without raising an exception, the batch will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message batch in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.

Parameters:

  • min_bytes (Integer) (defaults to: 1)

    the minimum number of bytes to read before returning messages from the server; if max_wait_time is reached, this is ignored.

  • max_wait_time (Integer, Float) (defaults to: 5)

    the maximum duration of time to wait before returning messages from the server, in seconds.

  • automatically_mark_as_processed (Boolean) (defaults to: true)

    whether to automatically mark a batch's messages as successfully processed when the block returns without an exception. Once marked successful, the offsets of processed messages can be committed to Kafka.

Yield Parameters:

Returns:

  • (nil)


243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/kafka/consumer.rb', line 243

def each_batch(min_bytes: 1, max_wait_time: 5, automatically_mark_as_processed: true)
  consumer_loop do
    batches = fetch_batches(min_bytes: min_bytes, max_wait_time: max_wait_time)

    batches.each do |batch|
      unless batch.empty?
        @instrumenter.instrument("process_batch.consumer") do |notification|
          notification.update(
            topic: batch.topic,
            partition: batch.partition,
            offset_lag: batch.offset_lag,
            highwater_mark_offset: batch.highwater_mark_offset,
            message_count: batch.messages.count,
          )

          begin
            yield batch
          rescue => e
            offset_range = (batch.first_offset .. batch.last_offset)
            location = "#{batch.topic}/#{batch.partition} in offset range #{offset_range}"
            backtrace = e.backtrace.join("\n")

            @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}"

            raise ProcessingError.new(batch.topic, batch.partition, offset_range)
          end
        end

        mark_message_as_processed(batch.messages.last) if automatically_mark_as_processed
      end

      @offset_manager.commit_offsets_if_necessary

      @heartbeat.send_if_necessary

      return if !@running
    end
  end
end

#each_message(min_bytes: 1, max_wait_time: 5, automatically_mark_as_processed: true) {|message| ... } ⇒ nil

Fetches and enumerates the messages in the topics that the consumer group subscribes to.

Each message is yielded to the provided block. If the block returns without raising an exception, the message will be considered successfully processed. At regular intervals the offset of the most recent successfully processed message in each partition will be committed to the Kafka offset store. If the consumer crashes or leaves the group, the group member that is tasked with taking over processing of these partitions will resume at the last committed offsets.

Parameters:

  • min_bytes (Integer) (defaults to: 1)

    the minimum number of bytes to read before returning messages from the server; if max_wait_time is reached, this is ignored.

  • max_wait_time (Integer, Float) (defaults to: 5)

    the maximum duration of time to wait before returning messages from the server, in seconds.

  • automatically_mark_as_processed (Boolean) (defaults to: true)

    whether to automatically mark a message as successfully processed when the block returns without an exception. Once marked successful, the offsets of processed messages can be committed to Kafka.

Yield Parameters:

Returns:

  • (nil)

Raises:



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/kafka/consumer.rb', line 179

def each_message(min_bytes: 1, max_wait_time: 5, automatically_mark_as_processed: true)
  consumer_loop do
    batches = fetch_batches(min_bytes: min_bytes, max_wait_time: max_wait_time)

    batches.each do |batch|
      batch.messages.each do |message|
        @instrumenter.instrument("process_message.consumer") do |notification|
          notification.update(
            topic: message.topic,
            partition: message.partition,
            offset: message.offset,
            offset_lag: batch.highwater_mark_offset - message.offset - 1,
            key: message.key,
            value: message.value,
          )

          begin
            yield message
          rescue => e
            location = "#{message.topic}/#{message.partition} at offset #{message.offset}"
            backtrace = e.backtrace.join("\n")
            @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}"

            raise ProcessingError.new(message.topic, message.partition, message.offset)
          end
        end

        mark_message_as_processed(message) if automatically_mark_as_processed
        @offset_manager.commit_offsets_if_necessary

        @heartbeat.send_if_necessary

        return if !@running
      end
    end

    # We may not have received any messages, but it's still a good idea to
    # commit offsets if we've processed messages in the last set of batches.
    @offset_manager.commit_offsets_if_necessary
  end
end

#mark_message_as_processed(message) ⇒ Object



287
288
289
# File 'lib/kafka/consumer.rb', line 287

def mark_message_as_processed(message)
  @offset_manager.mark_as_processed(message.topic, message.partition, message.offset)
end

#pause(topic, partition, timeout: nil) ⇒ nil

Pause processing of a specific topic partition.

When a specific message causes the processor code to fail, it can be a good idea to simply pause the partition until the error can be resolved, allowing the rest of the partitions to continue being processed.

If the timeout argument is passed, the partition will automatically be resumed when the timeout expires.

Parameters:

  • topic (String)
  • partition (Integer)
  • timeout (Integer) (defaults to: nil)

    the number of seconds to pause the partition for, or nil if the partition should not be automatically resumed.

Returns:

  • (nil)


111
112
113
114
# File 'lib/kafka/consumer.rb', line 111

def pause(topic, partition, timeout: nil)
  @paused_partitions[topic] ||= {}
  @paused_partitions[topic][partition] = timeout && Time.now + timeout
end

#paused?(topic, partition) ⇒ Boolean

Whether the topic partition is currently paused.

Parameters:

  • topic (String)
  • partition (Integer)

Returns:

  • (Boolean)

    true if the partition is paused, false otherwise.

See Also:



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/kafka/consumer.rb', line 133

def paused?(topic, partition)
  partitions = @paused_partitions.fetch(topic, {})

  if partitions.key?(partition)
    # Users can set an optional timeout, after which the partition is
    # automatically resumed. When pausing, the timeout is translated to an
    # absolute point in time.
    timeout = partitions.fetch(partition)

    if timeout.nil?
      true
    elsif Time.now < timeout
      true
    else
      @logger.info "Automatically resuming partition #{topic}/#{partition}, pause timeout expired"
      resume(topic, partition)
      false
    end
  end
end

#resume(topic, partition) ⇒ nil

Resume processing of a topic partition.

Parameters:

  • topic (String)
  • partition (Integer)

Returns:

  • (nil)

See Also:



122
123
124
125
# File 'lib/kafka/consumer.rb', line 122

def resume(topic, partition)
  paused_partitions = @paused_partitions.fetch(topic, {})
  paused_partitions.delete(partition)
end

#send_heartbeat_if_necessaryObject



291
292
293
# File 'lib/kafka/consumer.rb', line 291

def send_heartbeat_if_necessary
  @heartbeat.send_if_necessary
end

#stopObject



93
94
95
# File 'lib/kafka/consumer.rb', line 93

def stop
  @running = false
end

#subscribe(topic, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) ⇒ nil

Subscribes the consumer to a topic.

Typically you either want to start reading messages from the very beginning of the topic's partitions or you simply want to wait for new messages to be written. In the former case, set start_from_beginning to true (the default); in the latter, set it to false.

Parameters:

  • topic (String)

    the name of the topic to subscribe to.

  • default_offset (Symbol) (defaults to: nil)

    whether to start from the beginning or the end of the topic's partitions. Deprecated.

  • start_from_beginning (Boolean) (defaults to: true)

    whether to start from the beginning of the topic or just subscribe to new messages being produced. This only applies when first consuming a topic partition – once the consumer has checkpointed its progress, it will always resume from the last checkpoint.

  • max_bytes_per_partition (Integer) (defaults to: 1048576)

    the maximum amount of data fetched from a single partition at a time.

Returns:

  • (nil)


83
84
85
86
87
88
89
90
91
# File 'lib/kafka/consumer.rb', line 83

def subscribe(topic, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576)
  default_offset ||= start_from_beginning ? :earliest : :latest

  @group.subscribe(topic)
  @offset_manager.set_default_offset(topic, default_offset)
  @max_bytes[topic] = max_bytes_per_partition

  nil
end