Class: Kafka::Client
- Inherits:
-
Object
- Object
- Kafka::Client
- Defined in:
- lib/kafka/client.rb
Constant Summary collapse
- DEFAULT_CLIENT_ID =
"ruby-kafka"
- DEFAULT_LOGGER =
Logger.new("/dev/null")
Instance Method Summary collapse
- #async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options) ⇒ Object
- #close ⇒ Object
-
#fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576) ⇒ Array<Kafka::FetchedMessage>
Fetches a batch of messages from a single partition.
-
#initialize(seed_brokers:, client_id: DEFAULT_CLIENT_ID, logger: DEFAULT_LOGGER, connect_timeout: nil, socket_timeout: nil) ⇒ Client
constructor
Initializes a new Kafka client.
-
#partitions_for(topic) ⇒ Integer
Counts the number of partitions in a topic.
-
#producer(**options) ⇒ Kafka::Producer
Builds a new producer.
-
#topics ⇒ Array<String>
Lists all topics in the cluster.
Constructor Details
#initialize(seed_brokers:, client_id: DEFAULT_CLIENT_ID, logger: DEFAULT_LOGGER, connect_timeout: nil, socket_timeout: nil) ⇒ Client
Initializes a new Kafka client.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/kafka/client.rb', line 28 def initialize(seed_brokers:, client_id: DEFAULT_CLIENT_ID, logger: DEFAULT_LOGGER, connect_timeout: nil, socket_timeout: nil) @logger = logger broker_pool = BrokerPool.new( client_id: client_id, connect_timeout: connect_timeout, socket_timeout: socket_timeout, logger: logger, ) @cluster = Cluster.new( seed_brokers: seed_brokers, broker_pool: broker_pool, logger: logger, ) end |
Instance Method Details
#async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options) ⇒ Object
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/kafka/client.rb', line 55 def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **) sync_producer = producer(**) AsyncProducer.new( sync_producer: sync_producer, delivery_interval: delivery_interval, delivery_threshold: delivery_threshold, max_queue_size: max_queue_size, ) end |
#close ⇒ Object
155 156 157 |
# File 'lib/kafka/client.rb', line 155 def close @cluster.disconnect end |
#fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576) ⇒ Array<Kafka::FetchedMessage>
This API is still alpha level. Don’t try to use it in production.
Fetches a batch of messages from a single partition. Note that it’s possible to get back empty batches.
The starting point for the fetch can be configured with the :offset
argument. If you pass a number, the fetch will start at that offset. However, there are two special Symbol values that can be passed instead:
:earliest
— the first offset in the partition.:latest
— the next offset that will be written to, effectively making the call block until there is a new message in the partition.
The Kafka protocol specifies the numeric values of these two options: -2 and -1, respectively. You can also pass in these numbers directly.
Example
When enumerating the messages in a partition, you typically fetch batches sequentially.
offset = :earliest
loop do
= kafka.(
topic: "my-topic",
partition: 42,
offset: offset,
)
.each do ||
puts .offset, .key, .value
# Set the next offset that should be read to be the subsequent
# offset.
offset = .offset + 1
end
end
See a working example in examples/simple-consumer.rb
.
127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/kafka/client.rb', line 127 def (topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576) operation = FetchOperation.new( cluster: @cluster, logger: @logger, min_bytes: min_bytes, max_wait_time: max_wait_time, ) operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes) operation.execute end |
#partitions_for(topic) ⇒ Integer
Counts the number of partitions in a topic.
151 152 153 |
# File 'lib/kafka/client.rb', line 151 def partitions_for(topic) @cluster.partitions_for(topic).count end |
#producer(**options) ⇒ Kafka::Producer
Builds a new producer.
options
are passed to Producer#initialize.
51 52 53 |
# File 'lib/kafka/client.rb', line 51 def producer(**) Producer.new(cluster: @cluster, logger: @logger, **) end |
#topics ⇒ Array<String>
Lists all topics in the cluster.
143 144 145 |
# File 'lib/kafka/client.rb', line 143 def topics @cluster.topics end |