Method: Kafka::Consumer#subscribe

Defined in:
lib/kafka/consumer.rb

#subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) ⇒ nil

Subscribes the consumer to a topic.

Typically you either want to start reading messages from the very beginning of the topic's partitions or you simply want to wait for new messages to be written. In the former case, set start_from_beginning to true (the default); in the latter, set it to false.

Parameters:

  • topic_or_regex (String, Regexp)

    subscribe to single topic with a string or multiple topics matching a regex.

  • default_offset (Symbol) (defaults to: nil)

    whether to start from the beginning or the end of the topic's partitions. Deprecated.

  • start_from_beginning (Boolean) (defaults to: true)

    whether to start from the beginning of the topic or just subscribe to new messages being produced. This only applies when first consuming a topic partition – once the consumer has checkpointed its progress, it will always resume from the last checkpoint.

  • max_bytes_per_partition (Integer) (defaults to: 1048576)

    the maximum amount of data fetched from a single partition at a time.

Returns:

  • (nil)


110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/kafka/consumer.rb', line 110

def subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576)
  default_offset ||= start_from_beginning ? :earliest : :latest

  @subscribed_topics[topic_or_regex] = {
    default_offset: default_offset,
    start_from_beginning: start_from_beginning,
    max_bytes_per_partition: max_bytes_per_partition
  }
  scan_for_subscribing

  nil
end