Class: Kafka::Protocol::JoinGroupRequest

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/join_group_request.rb

Constant Summary collapse

PROTOCOL_TYPE =
"consumer"

Instance Method Summary collapse

Constructor Details

#initialize(group_id:, session_timeout:, member_id:, topics: []) ⇒ JoinGroupRequest

Returns a new instance of JoinGroupRequest.



8
9
10
11
12
13
14
15
16
# File 'lib/kafka/protocol/join_group_request.rb', line 8

def initialize(group_id:, session_timeout:, member_id:, topics: [])
  @group_id = group_id
  @session_timeout = session_timeout * 1000 # Kafka wants ms.
  @member_id = member_id || ""
  @protocol_type = PROTOCOL_TYPE
  @group_protocols = {
    "standard" => ConsumerGroupProtocol.new(topics: ["test-messages"]),
  }
end

Instance Method Details

#api_keyObject



18
19
20
# File 'lib/kafka/protocol/join_group_request.rb', line 18

def api_key
  JOIN_GROUP_API
end

#encode(encoder) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/kafka/protocol/join_group_request.rb', line 26

def encode(encoder)
  encoder.write_string(@group_id)
  encoder.write_int32(@session_timeout)
  encoder.write_string(@member_id)
  encoder.write_string(@protocol_type)

  encoder.write_array(@group_protocols) do |name, |
    encoder.write_string(name)
    encoder.write_bytes(Encoder.encode_with())
  end
end

#response_classObject



22
23
24
# File 'lib/kafka/protocol/join_group_request.rb', line 22

def response_class
  JoinGroupResponse
end