Class: Kafka::Client
- Inherits:
-
Object
- Object
- Kafka::Client
- Defined in:
- lib/kafka/client.rb
Instance Method Summary collapse
-
#async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options) ⇒ AsyncProducer
Creates a new AsyncProducer instance.
-
#close ⇒ nil
Closes all connections to the Kafka brokers and frees up used resources.
-
#consumer(group_id:, session_timeout: 30, offset_commit_interval: 10, offset_commit_threshold: 0) ⇒ Consumer
Creates a new Kafka consumer.
-
#fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576) ⇒ Array<Kafka::FetchedMessage>
Fetches a batch of messages from a single partition.
-
#initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil) ⇒ Client
constructor
Initializes a new Kafka client.
-
#partitions_for(topic) ⇒ Integer
Counts the number of partitions in a topic.
-
#producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000) ⇒ Kafka::Producer
Initializes a new Kafka producer.
-
#topics ⇒ Array<String>
Lists all topics in the cluster.
Constructor Details
#initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil) ⇒ Client
Initializes a new Kafka client.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/kafka/client.rb', line 38 def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil) @logger = logger || Logger.new(nil) ssl_context = build_ssl_context(ssl_ca_cert, ssl_client_cert, ssl_client_cert_key) broker_pool = BrokerPool.new( client_id: client_id, connect_timeout: connect_timeout, socket_timeout: socket_timeout, logger: @logger, ssl_context: ssl_context, ) @cluster = Cluster.new( seed_brokers: seed_brokers, broker_pool: broker_pool, logger: @logger, ) end |
Instance Method Details
#async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options) ⇒ AsyncProducer
Creates a new AsyncProducer instance.
All parameters allowed by #producer can be passed. In addition to this, a few extra parameters can be passed when creating an async producer.
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/kafka/client.rb', line 121 def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **) sync_producer = producer(**) AsyncProducer.new( sync_producer: sync_producer, delivery_interval: delivery_interval, delivery_threshold: delivery_threshold, max_queue_size: max_queue_size, ) end |
#close ⇒ nil
Closes all connections to the Kafka brokers and frees up used resources.
259 260 261 |
# File 'lib/kafka/client.rb', line 259 def close @cluster.disconnect end |
#consumer(group_id:, session_timeout: 30, offset_commit_interval: 10, offset_commit_threshold: 0) ⇒ Consumer
Creates a new Kafka consumer.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/kafka/client.rb', line 143 def consumer(group_id:, session_timeout: 30, offset_commit_interval: 10, offset_commit_threshold: 0) group = ConsumerGroup.new( cluster: @cluster, logger: @logger, group_id: group_id, session_timeout: session_timeout, ) offset_manager = OffsetManager.new( group: group, logger: @logger, commit_interval: offset_commit_interval, commit_threshold: offset_commit_threshold, ) Consumer.new( cluster: @cluster, logger: @logger, group: group, offset_manager: offset_manager, session_timeout: session_timeout, ) end |
#fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576) ⇒ Array<Kafka::FetchedMessage>
This API is still alpha level. Don't try to use it in production.
Fetches a batch of messages from a single partition. Note that it's possible to get back empty batches.
The starting point for the fetch can be configured with the :offset argument.
If you pass a number, the fetch will start at that offset. However, there are
two special Symbol values that can be passed instead:
:earliest— the first offset in the partition.:latest— the next offset that will be written to, effectively making the call block until there is a new message in the partition.
The Kafka protocol specifies the numeric values of these two options: -2 and -1, respectively. You can also pass in these numbers directly.
Example
When enumerating the messages in a partition, you typically fetch batches sequentially.
offset = :earliest
loop do
= kafka.(
topic: "my-topic",
partition: 42,
offset: offset,
)
.each do ||
puts .offset, .key, .value
# Set the next offset that should be read to be the subsequent
# offset.
offset = .offset + 1
end
end
See a working example in examples/simple-consumer.rb.
228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/kafka/client.rb', line 228 def (topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576) operation = FetchOperation.new( cluster: @cluster, logger: @logger, min_bytes: min_bytes, max_wait_time: max_wait_time, ) operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes) operation.execute.flat_map {|batch| batch. } end |
#partitions_for(topic) ⇒ Integer
Counts the number of partitions in a topic.
252 253 254 |
# File 'lib/kafka/client.rb', line 252 def partitions_for(topic) @cluster.partitions_for(topic).count end |
#producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000) ⇒ Kafka::Producer
Initializes a new Kafka producer.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/kafka/client.rb', line 88 def producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000) compressor = Compressor.new( codec_name: compression_codec, threshold: compression_threshold, ) Producer.new( cluster: @cluster, logger: @logger, compressor: compressor, ack_timeout: ack_timeout, required_acks: required_acks, max_retries: max_retries, retry_backoff: retry_backoff, max_buffer_size: max_buffer_size, max_buffer_bytesize: max_buffer_bytesize, ) end |
#topics ⇒ Array<String>
Lists all topics in the cluster.
244 245 246 |
# File 'lib/kafka/client.rb', line 244 def topics @cluster.topics end |