Class: Kafka::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/broker.rb

Instance Method Summary collapse

Constructor Details

#initialize(connection:, node_id: nil, logger:) ⇒ Broker

Returns a new instance of Broker.



7
8
9
10
11
# File 'lib/kafka/broker.rb', line 7

def initialize(connection:, node_id: nil, logger:)
  @connection = connection
  @node_id = node_id
  @logger = logger
end

Instance Method Details

#address_match?(host, port) ⇒ Boolean

Returns:

  • (Boolean)


13
14
15
# File 'lib/kafka/broker.rb', line 13

def address_match?(host, port)
  @connection.address_match?(host, port)
end

#commit_offsets(**options) ⇒ Object



73
74
75
76
77
# File 'lib/kafka/broker.rb', line 73

def commit_offsets(**options)
  request = Protocol::OffsetCommitRequest.new(**options)

  @connection.send_request(request)
end

#disconnectnil

Returns:

  • (nil)


23
24
25
# File 'lib/kafka/broker.rb', line 23

def disconnect
  @connection.close
end

#fetch_messages(**options) ⇒ Kafka::Protocol::FetchResponse

Fetches messages from a specified topic and partition.

Parameters:

  • max_wait_time (Integer)
  • min_bytes (Integer)
  • topics (Hash)

Returns:



41
42
43
44
45
# File 'lib/kafka/broker.rb', line 41

def fetch_messages(**options)
  request = Protocol::FetchRequest.new(**options)

  @connection.send_request(request)
end

#fetch_metadata(**options) ⇒ Kafka::Protocol::MetadataResponse

Fetches cluster metadata from the broker.

Parameters:

  • topics (Array<String>)

Returns:



31
32
33
34
35
# File 'lib/kafka/broker.rb', line 31

def (**options)
  request = Protocol::TopicMetadataRequest.new(**options)

  @connection.send_request(request)
end

#fetch_offsets(**options) ⇒ Object



67
68
69
70
71
# File 'lib/kafka/broker.rb', line 67

def fetch_offsets(**options)
  request = Protocol::OffsetFetchRequest.new(**options)

  @connection.send_request(request)
end

#find_group_coordinator(**options) ⇒ Object



97
98
99
100
101
# File 'lib/kafka/broker.rb', line 97

def find_group_coordinator(**options)
  request = Protocol::GroupCoordinatorRequest.new(**options)

  @connection.send_request(request)
end

#heartbeat(**options) ⇒ Object



103
104
105
106
107
# File 'lib/kafka/broker.rb', line 103

def heartbeat(**options)
  request = Protocol::HeartbeatRequest.new(**options)

  @connection.send_request(request)
end

#join_group(**options) ⇒ Object



79
80
81
82
83
# File 'lib/kafka/broker.rb', line 79

def join_group(**options)
  request = Protocol::JoinGroupRequest.new(**options)

  @connection.send_request(request)
end

#leave_group(**options) ⇒ Object



91
92
93
94
95
# File 'lib/kafka/broker.rb', line 91

def leave_group(**options)
  request = Protocol::LeaveGroupRequest.new(**options)

  @connection.send_request(request)
end

#list_offsets(**options) ⇒ Kafka::Protocol::ListOffsetResponse

Lists the offset of the specified topics and partitions.

Parameters:

  • topics (Hash)

Returns:



51
52
53
54
55
# File 'lib/kafka/broker.rb', line 51

def list_offsets(**options)
  request = Protocol::ListOffsetRequest.new(**options)

  @connection.send_request(request)
end

#produce(**options) ⇒ Kafka::Protocol::ProduceResponse

Produces a set of messages to the broker.

Parameters:

  • required_acks (Integer)
  • timeout (Integer)
  • messages_for_topics (Hash)

Returns:



61
62
63
64
65
# File 'lib/kafka/broker.rb', line 61

def produce(**options)
  request = Protocol::ProduceRequest.new(**options)

  @connection.send_request(request)
end

#sasl_handshake(**options) ⇒ Object



109
110
111
112
113
# File 'lib/kafka/broker.rb', line 109

def sasl_handshake(**options)
  request = Protocol::SaslHandshakeRequest(**options)

  @connection.send_request(request)
end

#sync_group(**options) ⇒ Object



85
86
87
88
89
# File 'lib/kafka/broker.rb', line 85

def sync_group(**options)
  request = Protocol::SyncGroupRequest.new(**options)

  @connection.send_request(request)
end

#to_sString

Returns:

  • (String)


18
19
20
# File 'lib/kafka/broker.rb', line 18

def to_s
  "#{@connection} (node_id=#{@node_id.inspect})"
end