Class: Kafka::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/broker.rb

Instance Method Summary collapse

Constructor Details

#initialize(connection:, node_id: nil, logger:) ⇒ Broker

Returns a new instance of Broker.



7
8
9
10
11
# File 'lib/kafka/broker.rb', line 7

def initialize(connection:, node_id: nil, logger:)
  @connection = connection
  @node_id = node_id
  @logger = logger
end

Instance Method Details

#commit_offsets(**options) ⇒ Object



69
70
71
72
73
# File 'lib/kafka/broker.rb', line 69

def commit_offsets(**options)
  request = Protocol::OffsetCommitRequest.new(**options)

  @connection.send_request(request)
end

#disconnectnil

Returns:

  • (nil)


19
20
21
# File 'lib/kafka/broker.rb', line 19

def disconnect
  @connection.close
end

#fetch_messages(**options) ⇒ Kafka::Protocol::FetchResponse

Fetches messages from a specified topic and partition.

Parameters:

  • max_wait_time (Integer)
  • min_bytes (Integer)
  • topics (Hash)

Returns:



37
38
39
40
41
# File 'lib/kafka/broker.rb', line 37

def fetch_messages(**options)
  request = Protocol::FetchRequest.new(**options)

  @connection.send_request(request)
end

#fetch_metadata(**options) ⇒ Kafka::Protocol::MetadataResponse

Fetches cluster metadata from the broker.

Parameters:

  • topics (Array<String>)

Returns:



27
28
29
30
31
# File 'lib/kafka/broker.rb', line 27

def (**options)
  request = Protocol::TopicMetadataRequest.new(**options)

  @connection.send_request(request)
end

#fetch_offsets(**options) ⇒ Object



63
64
65
66
67
# File 'lib/kafka/broker.rb', line 63

def fetch_offsets(**options)
  request = Protocol::OffsetFetchRequest.new(**options)

  @connection.send_request(request)
end

#find_group_coordinator(**options) ⇒ Object



93
94
95
96
97
# File 'lib/kafka/broker.rb', line 93

def find_group_coordinator(**options)
  request = Protocol::GroupCoordinatorRequest.new(**options)

  @connection.send_request(request)
end

#heartbeat(**options) ⇒ Object



99
100
101
102
103
# File 'lib/kafka/broker.rb', line 99

def heartbeat(**options)
  request = Protocol::HeartbeatRequest.new(**options)

  @connection.send_request(request)
end

#join_group(**options) ⇒ Object



75
76
77
78
79
# File 'lib/kafka/broker.rb', line 75

def join_group(**options)
  request = Protocol::JoinGroupRequest.new(**options)

  @connection.send_request(request)
end

#leave_group(**options) ⇒ Object



87
88
89
90
91
# File 'lib/kafka/broker.rb', line 87

def leave_group(**options)
  request = Protocol::LeaveGroupRequest.new(**options)

  @connection.send_request(request)
end

#list_offsets(**options) ⇒ Kafka::Protocol::ListOffsetResponse

Lists the offset of the specified topics and partitions.

Parameters:

  • topics (Hash)

Returns:



47
48
49
50
51
# File 'lib/kafka/broker.rb', line 47

def list_offsets(**options)
  request = Protocol::ListOffsetRequest.new(**options)

  @connection.send_request(request)
end

#produce(**options) ⇒ Kafka::Protocol::ProduceResponse

Produces a set of messages to the broker.

Parameters:

  • required_acks (Integer)
  • timeout (Integer)
  • messages_for_topics (Hash)

Returns:



57
58
59
60
61
# File 'lib/kafka/broker.rb', line 57

def produce(**options)
  request = Protocol::ProduceRequest.new(**options)

  @connection.send_request(request)
end

#sync_group(**options) ⇒ Object



81
82
83
84
85
# File 'lib/kafka/broker.rb', line 81

def sync_group(**options)
  request = Protocol::SyncGroupRequest.new(**options)

  @connection.send_request(request)
end

#to_sString

Returns:

  • (String)


14
15
16
# File 'lib/kafka/broker.rb', line 14

def to_s
  "#{@connection} (node_id=#{@node_id.inspect})"
end