Class: Kafka::Broker
- Inherits:
-
Object
- Object
- Kafka::Broker
- Defined in:
- lib/kafka/broker.rb
Instance Method Summary collapse
- #address_match?(host, port) ⇒ Boolean
- #api_versions ⇒ Object
- #commit_offsets(**options) ⇒ Object
- #create_topics(**options) ⇒ Object
- #disconnect ⇒ nil
-
#fetch_messages(**options) ⇒ Kafka::Protocol::FetchResponse
Fetches messages from a specified topic and partition.
-
#fetch_metadata(**options) ⇒ Kafka::Protocol::MetadataResponse
Fetches cluster metadata from the broker.
- #fetch_offsets(**options) ⇒ Object
- #find_group_coordinator(**options) ⇒ Object
- #heartbeat(**options) ⇒ Object
-
#initialize(connection_builder:, host:, port:, node_id: nil, logger:) ⇒ Broker
constructor
A new instance of Broker.
- #join_group(**options) ⇒ Object
- #leave_group(**options) ⇒ Object
-
#list_offsets(**options) ⇒ Kafka::Protocol::ListOffsetResponse
Lists the offset of the specified topics and partitions.
-
#produce(**options) ⇒ Kafka::Protocol::ProduceResponse
Produces a set of messages to the broker.
- #sync_group(**options) ⇒ Object
- #to_s ⇒ String
Constructor Details
#initialize(connection_builder:, host:, port:, node_id: nil, logger:) ⇒ Broker
Returns a new instance of Broker.
7 8 9 10 11 12 13 14 |
# File 'lib/kafka/broker.rb', line 7 def initialize(connection_builder:, host:, port:, node_id: nil, logger:) @connection_builder = connection_builder @connection = nil @host = host @port = port @node_id = node_id @logger = logger end |
Instance Method Details
#address_match?(host, port) ⇒ Boolean
16 17 18 |
# File 'lib/kafka/broker.rb', line 16 def address_match?(host, port) host == @host && port == @port end |
#api_versions ⇒ Object
118 119 120 121 122 |
# File 'lib/kafka/broker.rb', line 118 def api_versions request = Protocol::ApiVersionsRequest.new send_request(request) end |
#commit_offsets(**options) ⇒ Object
76 77 78 79 80 |
# File 'lib/kafka/broker.rb', line 76 def commit_offsets(**) request = Protocol::OffsetCommitRequest.new(**) send_request(request) end |
#create_topics(**options) ⇒ Object
112 113 114 115 116 |
# File 'lib/kafka/broker.rb', line 112 def create_topics(**) request = Protocol::CreateTopicsRequest.new(**) send_request(request) end |
#disconnect ⇒ nil
26 27 28 |
# File 'lib/kafka/broker.rb', line 26 def disconnect connection.close end |
#fetch_messages(**options) ⇒ Kafka::Protocol::FetchResponse
Fetches messages from a specified topic and partition.
44 45 46 47 48 |
# File 'lib/kafka/broker.rb', line 44 def (**) request = Protocol::FetchRequest.new(**) send_request(request) end |
#fetch_metadata(**options) ⇒ Kafka::Protocol::MetadataResponse
Fetches cluster metadata from the broker.
34 35 36 37 38 |
# File 'lib/kafka/broker.rb', line 34 def (**) request = Protocol::TopicMetadataRequest.new(**) send_request(request) end |
#fetch_offsets(**options) ⇒ Object
70 71 72 73 74 |
# File 'lib/kafka/broker.rb', line 70 def fetch_offsets(**) request = Protocol::OffsetFetchRequest.new(**) send_request(request) end |
#find_group_coordinator(**options) ⇒ Object
100 101 102 103 104 |
# File 'lib/kafka/broker.rb', line 100 def find_group_coordinator(**) request = Protocol::GroupCoordinatorRequest.new(**) send_request(request) end |
#heartbeat(**options) ⇒ Object
106 107 108 109 110 |
# File 'lib/kafka/broker.rb', line 106 def heartbeat(**) request = Protocol::HeartbeatRequest.new(**) send_request(request) end |
#join_group(**options) ⇒ Object
82 83 84 85 86 |
# File 'lib/kafka/broker.rb', line 82 def join_group(**) request = Protocol::JoinGroupRequest.new(**) send_request(request) end |
#leave_group(**options) ⇒ Object
94 95 96 97 98 |
# File 'lib/kafka/broker.rb', line 94 def leave_group(**) request = Protocol::LeaveGroupRequest.new(**) send_request(request) end |
#list_offsets(**options) ⇒ Kafka::Protocol::ListOffsetResponse
Lists the offset of the specified topics and partitions.
54 55 56 57 58 |
# File 'lib/kafka/broker.rb', line 54 def list_offsets(**) request = Protocol::ListOffsetRequest.new(**) send_request(request) end |
#produce(**options) ⇒ Kafka::Protocol::ProduceResponse
Produces a set of messages to the broker.
64 65 66 67 68 |
# File 'lib/kafka/broker.rb', line 64 def produce(**) request = Protocol::ProduceRequest.new(**) send_request(request) end |
#sync_group(**options) ⇒ Object
88 89 90 91 92 |
# File 'lib/kafka/broker.rb', line 88 def sync_group(**) request = Protocol::SyncGroupRequest.new(**) send_request(request) end |
#to_s ⇒ String
21 22 23 |
# File 'lib/kafka/broker.rb', line 21 def to_s "#{connection} (node_id=#{@node_id.inspect})" end |