Class: Kafka::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil) ⇒ Client

Initializes a new Kafka client.

Parameters:

  • seed_brokers (Array<String>)

    the list of brokers used to initialize the client.

  • client_id (String) (defaults to: "ruby-kafka")

    the identifier for this application.

  • logger (Logger) (defaults to: nil)

    the logger that should be used by the client.

  • connect_timeout (Integer, nil) (defaults to: nil)

    the timeout setting for connecting to brokers. See BrokerPool#initialize.

  • socket_timeout (Integer, nil) (defaults to: nil)

    the timeout setting for socket connections. See BrokerPool#initialize.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/kafka/client.rb', line 27

def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil)
  @logger = logger || Logger.new("/dev/null")

  broker_pool = BrokerPool.new(
    client_id: client_id,
    connect_timeout: connect_timeout,
    socket_timeout: socket_timeout,
    logger: logger,
  )

  @cluster = Cluster.new(
    seed_brokers: seed_brokers,
    broker_pool: broker_pool,
    logger: logger,
  )
end

Instance Method Details

#async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options) ⇒ AsyncProducer

Creates a new AsyncProducer instance.

All parameters allowed by #producer can be passed. In addition to this, a few extra parameters can be passed when creating an async producer.

Parameters:

  • max_queue_size (Integer) (defaults to: 1000)

    the maximum number of messages allowed in the queue.

  • delivery_threshold (Integer) (defaults to: 0)

    if greater than zero, the number of buffered messages that will automatically trigger a delivery.

  • delivery_interval (Integer) (defaults to: 0)

    if greater than zero, the number of seconds between automatic message deliveries.

Returns:

See Also:



68
69
70
71
72
73
74
75
76
77
# File 'lib/kafka/client.rb', line 68

def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options)
  sync_producer = producer(**options)

  AsyncProducer.new(
    sync_producer: sync_producer,
    delivery_interval: delivery_interval,
    delivery_threshold: delivery_threshold,
    max_queue_size: max_queue_size,
  )
end

#closenil

Closes all connections to the Kafka brokers and frees up used resources.

Returns:

  • (nil)


185
186
187
# File 'lib/kafka/client.rb', line 185

def close
  @cluster.disconnect
end

#consumer(**options) ⇒ Consumer

Creates a new Consumer instance.

options are passed to Kafka::Consumer#initialize.

Returns:

See Also:



85
86
87
88
89
90
91
# File 'lib/kafka/client.rb', line 85

def consumer(**options)
  Consumer.new(
    cluster: @cluster,
    logger: @logger,
    **options,
  )
end

#fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576) ⇒ Array<Kafka::FetchedMessage>

Note:

This API is still alpha level. Don't try to use it in production.

Fetches a batch of messages from a single partition. Note that it's possible to get back empty batches.

The starting point for the fetch can be configured with the :offset argument. If you pass a number, the fetch will start at that offset. However, there are two special Symbol values that can be passed instead:

  • :earliest — the first offset in the partition.
  • :latest — the next offset that will be written to, effectively making the call block until there is a new message in the partition.

The Kafka protocol specifies the numeric values of these two options: -2 and -1, respectively. You can also pass in these numbers directly.

Example

When enumerating the messages in a partition, you typically fetch batches sequentially.

offset = :earliest

loop do
  messages = kafka.fetch_messages(
    topic: "my-topic",
    partition: 42,
    offset: offset,
  )

  messages.each do |message|
    puts message.offset, message.key, message.value

    # Set the next offset that should be read to be the subsequent
    # offset.
    offset = message.offset + 1
  end
end

See a working example in examples/simple-consumer.rb.

Parameters:

  • topic (String)

    the topic that messages should be fetched from.

  • partition (Integer)

    the partition that messages should be fetched from.

  • offset (Integer, Symbol) (defaults to: :latest)

    the offset to start reading from. Default is the latest offset.

  • max_wait_time (Integer) (defaults to: 5)

    the maximum amount of time to wait before the server responds, in seconds.

  • min_bytes (Integer) (defaults to: 1)

    the minimum number of bytes to wait for. If set to zero, the broker will respond immediately, but the response may be empty. The default is 1 byte, which means that the broker will respond as soon as a message is written to the partition.

  • max_bytes (Integer) (defaults to: 1048576)

    the maximum number of bytes to include in the response message set. Default is 1 MB. You need to set this higher if you expect messages to be larger than this.

Returns:



154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/kafka/client.rb', line 154

def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576)
  operation = FetchOperation.new(
    cluster: @cluster,
    logger: @logger,
    min_bytes: min_bytes,
    max_wait_time: max_wait_time,
  )

  operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes)

  operation.execute
end

#partitions_for(topic) ⇒ Integer

Counts the number of partitions in a topic.

Parameters:

  • topic (String)

Returns:

  • (Integer)

    the number of partitions in the topic.



178
179
180
# File 'lib/kafka/client.rb', line 178

def partitions_for(topic)
  @cluster.partitions_for(topic).count
end

#producer(**options) ⇒ Kafka::Producer

Builds a new producer.

options are passed to Producer#initialize.

Returns:

See Also:



50
51
52
# File 'lib/kafka/client.rb', line 50

def producer(**options)
  Producer.new(cluster: @cluster, logger: @logger, **options)
end

#topicsArray<String>

Lists all topics in the cluster.

Returns:

  • (Array<String>)

    the list of topic names.



170
171
172
# File 'lib/kafka/client.rb', line 170

def topics
  @cluster.topics
end