Class: Kafka::BrokerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/broker_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(client_id:, connect_timeout: nil, socket_timeout: nil, logger:) ⇒ BrokerPool

Returns a new instance of BrokerPool.



5
6
7
8
9
10
11
# File 'lib/kafka/broker_pool.rb', line 5

def initialize(client_id:, connect_timeout: nil, socket_timeout: nil, logger:)
  @client_id = client_id
  @connect_timeout = connect_timeout
  @socket_timeout = socket_timeout
  @logger = logger
  @brokers = {}
end

Instance Method Details

#closeObject



31
32
33
34
35
36
# File 'lib/kafka/broker_pool.rb', line 31

def close
  @brokers.each do |id, broker|
    @logger.info "Disconnecting broker #{id}"
    broker.disconnect
  end
end

#connect(host, port, node_id: nil) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/kafka/broker_pool.rb', line 13

def connect(host, port, node_id: nil)
  return @brokers.fetch(node_id) if @brokers.key?(node_id)

  broker = Broker.connect(
    host: host,
    port: port,
    node_id: node_id,
    client_id: @client_id,
    connect_timeout: @connect_timeout,
    socket_timeout: @socket_timeout,
    logger: @logger,
  )

  @brokers[node_id] = broker unless node_id.nil?

  broker
end