Class: Kafka::Client
- Inherits:
-
Object
- Object
- Kafka::Client
- Defined in:
- lib/kafka/client.rb
Instance Method Summary collapse
-
#async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options) ⇒ AsyncProducer
Creates a new AsyncProducer instance.
-
#close ⇒ nil
Closes all connections to the Kafka brokers and frees up used resources.
-
#consumer(**options) ⇒ Consumer
Creates a new Consumer instance.
-
#fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576) ⇒ Array<Kafka::FetchedMessage>
Fetches a batch of messages from a single partition.
-
#initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil) ⇒ Client
constructor
Initializes a new Kafka client.
-
#partitions_for(topic) ⇒ Integer
Counts the number of partitions in a topic.
-
#producer(**options) ⇒ Kafka::Producer
Builds a new producer.
-
#topics ⇒ Array<String>
Lists all topics in the cluster.
Constructor Details
#initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil) ⇒ Client
Initializes a new Kafka client.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/kafka/client.rb', line 27 def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil) @logger = logger || Logger.new("/dev/null") broker_pool = BrokerPool.new( client_id: client_id, connect_timeout: connect_timeout, socket_timeout: socket_timeout, logger: logger, ) @cluster = Cluster.new( seed_brokers: seed_brokers, broker_pool: broker_pool, logger: logger, ) end |
Instance Method Details
#async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options) ⇒ AsyncProducer
Creates a new AsyncProducer instance.
All parameters allowed by #producer can be passed. In addition to this, a few extra parameters can be passed when creating an async producer.
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/kafka/client.rb', line 68 def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **) sync_producer = producer(**) AsyncProducer.new( sync_producer: sync_producer, delivery_interval: delivery_interval, delivery_threshold: delivery_threshold, max_queue_size: max_queue_size, ) end |
#close ⇒ nil
Closes all connections to the Kafka brokers and frees up used resources.
185 186 187 |
# File 'lib/kafka/client.rb', line 185 def close @cluster.disconnect end |
#consumer(**options) ⇒ Consumer
Creates a new Consumer instance.
options are passed to Kafka::Consumer#initialize.
85 86 87 88 89 90 91 |
# File 'lib/kafka/client.rb', line 85 def consumer(**) Consumer.new( cluster: @cluster, logger: @logger, **, ) end |
#fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576) ⇒ Array<Kafka::FetchedMessage>
This API is still alpha level. Don't try to use it in production.
Fetches a batch of messages from a single partition. Note that it's possible to get back empty batches.
The starting point for the fetch can be configured with the :offset argument.
If you pass a number, the fetch will start at that offset. However, there are
two special Symbol values that can be passed instead:
:earliest— the first offset in the partition.:latest— the next offset that will be written to, effectively making the call block until there is a new message in the partition.
The Kafka protocol specifies the numeric values of these two options: -2 and -1, respectively. You can also pass in these numbers directly.
Example
When enumerating the messages in a partition, you typically fetch batches sequentially.
offset = :earliest
loop do
= kafka.(
topic: "my-topic",
partition: 42,
offset: offset,
)
.each do ||
puts .offset, .key, .value
# Set the next offset that should be read to be the subsequent
# offset.
offset = .offset + 1
end
end
See a working example in examples/simple-consumer.rb.
154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/kafka/client.rb', line 154 def (topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576) operation = FetchOperation.new( cluster: @cluster, logger: @logger, min_bytes: min_bytes, max_wait_time: max_wait_time, ) operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes) operation.execute end |
#partitions_for(topic) ⇒ Integer
Counts the number of partitions in a topic.
178 179 180 |
# File 'lib/kafka/client.rb', line 178 def partitions_for(topic) @cluster.partitions_for(topic).count end |
#producer(**options) ⇒ Kafka::Producer
Builds a new producer.
options are passed to Producer#initialize.
50 51 52 |
# File 'lib/kafka/client.rb', line 50 def producer(**) Producer.new(cluster: @cluster, logger: @logger, **) end |
#topics ⇒ Array<String>
Lists all topics in the cluster.
170 171 172 |
# File 'lib/kafka/client.rb', line 170 def topics @cluster.topics end |