Class: Kafka::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/broker.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection:, node_id: nil, logger:) ⇒ Broker

Returns a new instance of Broker.



12
13
14
15
16
# File 'lib/kafka/broker.rb', line 12

def initialize(connection:, node_id: nil, logger:)
  @connection = connection
  @node_id = node_id
  @logger = logger
end

Class Method Details

.connect(node_id: nil, logger:, **options) ⇒ Object



7
8
9
10
# File 'lib/kafka/broker.rb', line 7

def self.connect(node_id: nil, logger:, **options)
  connection = Connection.new(logger: logger, **options)
  new(connection: connection, node_id: node_id, logger: logger)
end

Instance Method Details

#disconnectObject



22
23
24
# File 'lib/kafka/broker.rb', line 22

def disconnect
  @connection.close
end

#fetch_metadata(**options) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/kafka/broker.rb', line 26

def (**options)
  request = Protocol::TopicMetadataRequest.new(**options)
  response_class = Protocol::MetadataResponse

  response = @connection.send_request(request, response_class)

  response.topics.each do |topic|
    Protocol.handle_error(topic.topic_error_code)

    topic.partitions.each do |partition|
      begin
        Protocol.handle_error(partition.partition_error_code)
      rescue ReplicaNotAvailable
        # This error can be safely ignored per the protocol specification.
        @logger.warn "Replica not available for #{topic.topic_name}/#{partition.partition_id}"
      end
    end
  end

  response
end

#produce(**options) ⇒ Object



48
49
50
51
52
53
# File 'lib/kafka/broker.rb', line 48

def produce(**options)
  request = Protocol::ProduceRequest.new(**options)
  response_class = request.requires_acks? ? Protocol::ProduceResponse : nil

  @connection.send_request(request, response_class)
end

#to_sObject



18
19
20
# File 'lib/kafka/broker.rb', line 18

def to_s
  "#{@connection} (node_id=#{@node_id.inspect})"
end