Class: Kafka::Broker
- Inherits:
-
Object
- Object
- Kafka::Broker
- Defined in:
- lib/kafka/broker.rb
Instance Method Summary collapse
- #address_match?(host, port) ⇒ Boolean
- #commit_offsets(**options) ⇒ Object
- #disconnect ⇒ nil
-
#fetch_messages(**options) ⇒ Kafka::Protocol::FetchResponse
Fetches messages from a specified topic and partition.
-
#fetch_metadata(**options) ⇒ Kafka::Protocol::MetadataResponse
Fetches cluster metadata from the broker.
- #fetch_offsets(**options) ⇒ Object
- #find_group_coordinator(**options) ⇒ Object
- #heartbeat(**options) ⇒ Object
-
#initialize(connection:, node_id: nil, logger:) ⇒ Broker
constructor
A new instance of Broker.
- #join_group(**options) ⇒ Object
- #leave_group(**options) ⇒ Object
-
#list_offsets(**options) ⇒ Kafka::Protocol::ListOffsetResponse
Lists the offset of the specified topics and partitions.
-
#produce(**options) ⇒ Kafka::Protocol::ProduceResponse
Produces a set of messages to the broker.
- #sasl_handshake(**options) ⇒ Object
- #sync_group(**options) ⇒ Object
- #to_s ⇒ String
Constructor Details
#initialize(connection:, node_id: nil, logger:) ⇒ Broker
Returns a new instance of Broker.
7 8 9 10 11 |
# File 'lib/kafka/broker.rb', line 7 def initialize(connection:, node_id: nil, logger:) @connection = connection @node_id = node_id @logger = logger end |
Instance Method Details
#address_match?(host, port) ⇒ Boolean
13 14 15 |
# File 'lib/kafka/broker.rb', line 13 def address_match?(host, port) @connection.address_match?(host, port) end |
#commit_offsets(**options) ⇒ Object
73 74 75 76 77 |
# File 'lib/kafka/broker.rb', line 73 def commit_offsets(**) request = Protocol::OffsetCommitRequest.new(**) @connection.send_request(request) end |
#disconnect ⇒ nil
23 24 25 |
# File 'lib/kafka/broker.rb', line 23 def disconnect @connection.close end |
#fetch_messages(**options) ⇒ Kafka::Protocol::FetchResponse
Fetches messages from a specified topic and partition.
41 42 43 44 45 |
# File 'lib/kafka/broker.rb', line 41 def (**) request = Protocol::FetchRequest.new(**) @connection.send_request(request) end |
#fetch_metadata(**options) ⇒ Kafka::Protocol::MetadataResponse
Fetches cluster metadata from the broker.
31 32 33 34 35 |
# File 'lib/kafka/broker.rb', line 31 def (**) request = Protocol::TopicMetadataRequest.new(**) @connection.send_request(request) end |
#fetch_offsets(**options) ⇒ Object
67 68 69 70 71 |
# File 'lib/kafka/broker.rb', line 67 def fetch_offsets(**) request = Protocol::OffsetFetchRequest.new(**) @connection.send_request(request) end |
#find_group_coordinator(**options) ⇒ Object
97 98 99 100 101 |
# File 'lib/kafka/broker.rb', line 97 def find_group_coordinator(**) request = Protocol::GroupCoordinatorRequest.new(**) @connection.send_request(request) end |
#heartbeat(**options) ⇒ Object
103 104 105 106 107 |
# File 'lib/kafka/broker.rb', line 103 def heartbeat(**) request = Protocol::HeartbeatRequest.new(**) @connection.send_request(request) end |
#join_group(**options) ⇒ Object
79 80 81 82 83 |
# File 'lib/kafka/broker.rb', line 79 def join_group(**) request = Protocol::JoinGroupRequest.new(**) @connection.send_request(request) end |
#leave_group(**options) ⇒ Object
91 92 93 94 95 |
# File 'lib/kafka/broker.rb', line 91 def leave_group(**) request = Protocol::LeaveGroupRequest.new(**) @connection.send_request(request) end |
#list_offsets(**options) ⇒ Kafka::Protocol::ListOffsetResponse
Lists the offset of the specified topics and partitions.
51 52 53 54 55 |
# File 'lib/kafka/broker.rb', line 51 def list_offsets(**) request = Protocol::ListOffsetRequest.new(**) @connection.send_request(request) end |
#produce(**options) ⇒ Kafka::Protocol::ProduceResponse
Produces a set of messages to the broker.
61 62 63 64 65 |
# File 'lib/kafka/broker.rb', line 61 def produce(**) request = Protocol::ProduceRequest.new(**) @connection.send_request(request) end |
#sasl_handshake(**options) ⇒ Object
109 110 111 112 113 |
# File 'lib/kafka/broker.rb', line 109 def sasl_handshake(**) request = Protocol::SaslHandshakeRequest(**) @connection.send_request(request) end |
#sync_group(**options) ⇒ Object
85 86 87 88 89 |
# File 'lib/kafka/broker.rb', line 85 def sync_group(**) request = Protocol::SyncGroupRequest.new(**) @connection.send_request(request) end |
#to_s ⇒ String
18 19 20 |
# File 'lib/kafka/broker.rb', line 18 def to_s "#{@connection} (node_id=#{@node_id.inspect})" end |