Class: Kafka::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/client.rb

Constant Summary collapse

DEFAULT_CLIENT_ID =
"ruby-kafka"
DEFAULT_LOGGER =
Logger.new("/dev/null")

Instance Method Summary collapse

Constructor Details

#initialize(seed_brokers:, client_id: DEFAULT_CLIENT_ID, logger: DEFAULT_LOGGER, connect_timeout: nil, socket_timeout: nil) ⇒ Client

Initializes a new Kafka client.

Parameters:

  • seed_brokers (Array<String>)

    the list of brokers used to initialize the client.

  • client_id (String) (defaults to: DEFAULT_CLIENT_ID)

    the identifier for this application.

  • logger (Logger) (defaults to: DEFAULT_LOGGER)
  • connect_timeout (Integer, nil) (defaults to: nil)

    the timeout setting for connecting to brokers. See BrokerPool#initialize.

  • socket_timeout (Integer, nil) (defaults to: nil)

    the timeout setting for socket connections. See BrokerPool#initialize.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/kafka/client.rb', line 28

def initialize(seed_brokers:, client_id: DEFAULT_CLIENT_ID, logger: DEFAULT_LOGGER, connect_timeout: nil, socket_timeout: nil)
  @logger = logger

  broker_pool = BrokerPool.new(
    client_id: client_id,
    connect_timeout: connect_timeout,
    socket_timeout: socket_timeout,
    logger: logger,
  )

  @cluster = Cluster.new(
    seed_brokers: seed_brokers,
    broker_pool: broker_pool,
    logger: logger,
  )
end

Instance Method Details

#async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options) ⇒ Object



55
56
57
58
59
60
61
62
63
64
# File 'lib/kafka/client.rb', line 55

def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options)
  sync_producer = producer(**options)

  AsyncProducer.new(
    sync_producer: sync_producer,
    delivery_interval: delivery_interval,
    delivery_threshold: delivery_threshold,
    max_queue_size: max_queue_size,
  )
end

#closeObject



155
156
157
# File 'lib/kafka/client.rb', line 155

def close
  @cluster.disconnect
end

#fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576) ⇒ Array<Kafka::FetchedMessage>

Note:

This API is still alpha level. Don’t try to use it in production.

Fetches a batch of messages from a single partition. Note that it’s possible to get back empty batches.

The starting point for the fetch can be configured with the :offset argument. If you pass a number, the fetch will start at that offset. However, there are two special Symbol values that can be passed instead:

  • :earliest — the first offset in the partition.
  • :latest — the next offset that will be written to, effectively making the call block until there is a new message in the partition.

The Kafka protocol specifies the numeric values of these two options: -2 and -1, respectively. You can also pass in these numbers directly.

Example

When enumerating the messages in a partition, you typically fetch batches sequentially.

offset = :earliest

loop do
  messages = kafka.fetch_messages(
    topic: "my-topic",
    partition: 42,
    offset: offset,
  )

  messages.each do |message|
    puts message.offset, message.key, message.value

    # Set the next offset that should be read to be the subsequent
    # offset.
    offset = message.offset + 1
  end
end

See a working example in examples/simple-consumer.rb.

Parameters:

  • topic (String)

    the topic that messages should be fetched from.

  • partition (Integer)

    the partition that messages should be fetched from.

  • offset (Integer, Symbol) (defaults to: :latest)

    the offset to start reading from. Default is the latest offset.

  • max_wait_time (Integer) (defaults to: 5)

    the maximum amount of time to wait before the server responds, in seconds.

  • min_bytes (Integer) (defaults to: 1)

    the minimum number of bytes to wait for. If set to zero, the broker will respond immediately, but the response may be empty. The default is 1 byte, which means that the broker will respond as soon as a message is written to the partition.

  • max_bytes (Integer) (defaults to: 1048576)

    the maximum number of bytes to include in the response message set. Default is 1 MB. You need to set this higher if you expect messages to be larger than this.

Returns:



127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/kafka/client.rb', line 127

def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576)
  operation = FetchOperation.new(
    cluster: @cluster,
    logger: @logger,
    min_bytes: min_bytes,
    max_wait_time: max_wait_time,
  )

  operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes)

  operation.execute
end

#partitions_for(topic) ⇒ Integer

Counts the number of partitions in a topic.

Parameters:

  • topic (String)

Returns:

  • (Integer)

    the number of partitions in the topic.



151
152
153
# File 'lib/kafka/client.rb', line 151

def partitions_for(topic)
  @cluster.partitions_for(topic).count
end

#producer(**options) ⇒ Kafka::Producer

Builds a new producer.

options are passed to Producer#initialize.

Returns:

See Also:



51
52
53
# File 'lib/kafka/client.rb', line 51

def producer(**options)
  Producer.new(cluster: @cluster, logger: @logger, **options)
end

#topicsArray<String>

Lists all topics in the cluster.

Returns:

  • (Array<String>)

    the list of topic names.



143
144
145
# File 'lib/kafka/client.rb', line 143

def topics
  @cluster.topics
end