Class: Kafka::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/broker.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection:, node_id: nil, logger:) ⇒ Broker

Returns a new instance of Broker.



12
13
14
15
16
# File 'lib/kafka/broker.rb', line 12

def initialize(connection:, node_id: nil, logger:)
  @connection = connection
  @node_id = node_id
  @logger = logger
end

Class Method Details

.connect(node_id: nil, logger:, **options) ⇒ Object



7
8
9
10
# File 'lib/kafka/broker.rb', line 7

def self.connect(node_id: nil, logger:, **options)
  connection = Connection.new(logger: logger, **options)
  new(connection: connection, node_id: node_id, logger: logger)
end

Instance Method Details

#disconnectObject



22
23
24
# File 'lib/kafka/broker.rb', line 22

def disconnect
  @connection.close
end

#fetch_metadata(**options) ⇒ Object



26
27
28
29
30
31
# File 'lib/kafka/broker.rb', line 26

def (**options)
  request = Protocol::TopicMetadataRequest.new(**options)
  response_class = Protocol::MetadataResponse

  @connection.send_request(request, response_class)
end

#produce(**options) ⇒ Object



33
34
35
36
37
38
# File 'lib/kafka/broker.rb', line 33

def produce(**options)
  request = Protocol::ProduceRequest.new(**options)
  response_class = request.requires_acks? ? Protocol::ProduceResponse : nil

  @connection.send_request(request, response_class)
end

#to_sObject



18
19
20
# File 'lib/kafka/broker.rb', line 18

def to_s
  "#{@connection} (node_id=#{@node_id.inspect})"
end