Class: Kafka::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/client.rb

Constant Summary collapse

DEFAULT_CLIENT_ID =
"ruby-kafka"

Instance Method Summary collapse

Constructor Details

#initialize(seed_brokers:, client_id: DEFAULT_CLIENT_ID, logger:, socket_timeout: nil) ⇒ Client

Initializes a new Kafka client.

Parameters:

  • seed_brokers (Array<String>)

    the list of brokers used to initialize the client.

  • client_id (String) (defaults to: DEFAULT_CLIENT_ID)

    the identifier for this application.

  • logger (Logger)
  • socket_timeout (Integer, nil) (defaults to: nil)

    the timeout setting for socket connections. See BrokerPool#initialize.



21
22
23
24
25
26
27
28
29
30
# File 'lib/kafka/client.rb', line 21

def initialize(seed_brokers:, client_id: DEFAULT_CLIENT_ID, logger:, socket_timeout: nil)
  @logger = logger

  @broker_pool = BrokerPool.new(
    seed_brokers: seed_brokers,
    client_id: client_id,
    logger: logger,
    socket_timeout: socket_timeout,
  )
end

Instance Method Details

#closeObject



49
50
51
# File 'lib/kafka/client.rb', line 49

def close
  @broker_pool.shutdown
end

#get_producer(**options) ⇒ Producer

Builds a new producer.

options are passed to Producer#initialize.

Returns:

See Also:



38
39
40
# File 'lib/kafka/client.rb', line 38

def get_producer(**options)
  Producer.new(broker_pool: @broker_pool, logger: @logger, **options)
end

#topicsArray<String>

Lists all topics in the cluster.

Returns:

  • (Array<String>)

    the list of topic names.



45
46
47
# File 'lib/kafka/client.rb', line 45

def topics
  @broker_pool.topics
end