Class: Kafka::Broker
- Inherits:
-
Object
- Object
- Kafka::Broker
- Defined in:
- lib/kafka/broker.rb
Class Method Summary collapse
Instance Method Summary collapse
- #disconnect ⇒ Object
- #fetch_metadata(**options) ⇒ Object
-
#initialize(connection:, node_id: nil, logger:) ⇒ Broker
constructor
A new instance of Broker.
- #produce(**options) ⇒ Object
- #to_s ⇒ Object
Constructor Details
#initialize(connection:, node_id: nil, logger:) ⇒ Broker
Returns a new instance of Broker.
12 13 14 15 16 |
# File 'lib/kafka/broker.rb', line 12 def initialize(connection:, node_id: nil, logger:) @connection = connection @node_id = node_id @logger = logger end |
Class Method Details
.connect(node_id: nil, logger:, **options) ⇒ Object
7 8 9 10 |
# File 'lib/kafka/broker.rb', line 7 def self.connect(node_id: nil, logger:, **) connection = Connection.new(logger: logger, **) new(connection: connection, node_id: node_id, logger: logger) end |
Instance Method Details
#disconnect ⇒ Object
22 23 24 |
# File 'lib/kafka/broker.rb', line 22 def disconnect @connection.close end |
#fetch_metadata(**options) ⇒ Object
26 27 28 29 30 31 |
# File 'lib/kafka/broker.rb', line 26 def (**) request = Protocol::TopicMetadataRequest.new(**) response_class = Protocol::MetadataResponse @connection.send_request(request, response_class) end |
#produce(**options) ⇒ Object
33 34 35 36 37 38 |
# File 'lib/kafka/broker.rb', line 33 def produce(**) request = Protocol::ProduceRequest.new(**) response_class = request.requires_acks? ? Protocol::ProduceResponse : nil @connection.send_request(request, response_class) end |
#to_s ⇒ Object
18 19 20 |
# File 'lib/kafka/broker.rb', line 18 def to_s "#{@connection} (node_id=#{@node_id.inspect})" end |