Module: Kafka::Protocol

Defined in:
lib/kafka/protocol.rb,
lib/kafka/protocol/decoder.rb,
lib/kafka/protocol/encoder.rb,
lib/kafka/protocol/message.rb,
lib/kafka/protocol/message_set.rb,
lib/kafka/protocol/fetch_request.rb,
lib/kafka/protocol/fetch_response.rb,
lib/kafka/protocol/produce_request.rb,
lib/kafka/protocol/request_message.rb,
lib/kafka/protocol/produce_response.rb,
lib/kafka/protocol/heartbeat_request.rb,
lib/kafka/protocol/member_assignment.rb,
lib/kafka/protocol/metadata_response.rb,
lib/kafka/protocol/heartbeat_response.rb,
lib/kafka/protocol/join_group_request.rb,
lib/kafka/protocol/sync_group_request.rb,
lib/kafka/protocol/join_group_response.rb,
lib/kafka/protocol/leave_group_request.rb,
lib/kafka/protocol/list_offset_request.rb,
lib/kafka/protocol/sync_group_response.rb,
lib/kafka/protocol/api_versions_request.rb,
lib/kafka/protocol/leave_group_response.rb,
lib/kafka/protocol/list_offset_response.rb,
lib/kafka/protocol/offset_fetch_request.rb,
lib/kafka/protocol/api_versions_response.rb,
lib/kafka/protocol/create_topics_request.rb,
lib/kafka/protocol/offset_commit_request.rb,
lib/kafka/protocol/offset_fetch_response.rb,
lib/kafka/protocol/create_topics_response.rb,
lib/kafka/protocol/offset_commit_response.rb,
lib/kafka/protocol/sasl_handshake_request.rb,
lib/kafka/protocol/topic_metadata_request.rb,
lib/kafka/protocol/consumer_group_protocol.rb,
lib/kafka/protocol/sasl_handshake_response.rb,
lib/kafka/protocol/group_coordinator_request.rb,
lib/kafka/protocol/group_coordinator_response.rb

Overview

The protocol layer of the library.

The Kafka protocol (https://kafka.apache.org/protocol) defines a set of API requests, each with a well-known numeric API key, as well as a set of error codes with specific meanings.

This module, and the classes contained in it, implement the client side of the protocol.

Defined Under Namespace

Classes: ApiVersionsRequest, ApiVersionsResponse, ConsumerGroupProtocol, CreateTopicsRequest, CreateTopicsResponse, Decoder, Encoder, FetchRequest, FetchResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListOffsetRequest, ListOffsetResponse, MemberAssignment, Message, MessageSet, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, RequestMessage, SaslHandshakeRequest, SaslHandshakeResponse, SyncGroupRequest, SyncGroupResponse, TopicMetadataRequest

Constant Summary collapse

REPLICA_ID =

The replica id of non-brokers is always -1.

-1
PRODUCE_API =
0
FETCH_API =
1
LIST_OFFSET_API =
2
TOPIC_METADATA_API =
3
OFFSET_COMMIT_API =
8
OFFSET_FETCH_API =
9
GROUP_COORDINATOR_API =
10
JOIN_GROUP_API =
11
HEARTBEAT_API =
12
LEAVE_GROUP_API =
13
SYNC_GROUP_API =
14
SASL_HANDSHAKE_API =
17
API_VERSIONS_API =
18
CREATE_TOPICS_API =
19
APIS =

A mapping from numeric API keys to symbolic API names.

{
  PRODUCE_API => :produce,
  FETCH_API => :fetch,
  LIST_OFFSET_API => :list_offset,
  TOPIC_METADATA_API => :topic_metadata,
  OFFSET_COMMIT_API => :offset_commit,
  OFFSET_FETCH_API => :offset_fetch,
  GROUP_COORDINATOR_API => :group_coordinator,
  JOIN_GROUP_API => :join_group,
  HEARTBEAT_API => :heartbeat,
  LEAVE_GROUP_API => :leave_group,
  SYNC_GROUP_API => :sync_group,
  SASL_HANDSHAKE_API => :sasl_handshake,
  API_VERSIONS_API => :api_versions,
  CREATE_TOPICS_API => :create_topics,
}
ERRORS =

A mapping from numeric error codes to exception classes.

{
  -1 => UnknownError,
   1 => OffsetOutOfRange,
   2 => CorruptMessage,
   3 => UnknownTopicOrPartition,
   4 => InvalidMessageSize,
   5 => LeaderNotAvailable,
   6 => NotLeaderForPartition,
   7 => RequestTimedOut,
   8 => BrokerNotAvailable,
   9 => ReplicaNotAvailable,
  10 => MessageSizeTooLarge,
  12 => OffsetMetadataTooLarge,
  15 => GroupCoordinatorNotAvailable,
  16 => NotCoordinatorForGroup,
  17 => InvalidTopic,
  18 => RecordListTooLarge,
  19 => NotEnoughReplicas,
  20 => NotEnoughReplicasAfterAppend,
  21 => InvalidRequiredAcks,
  22 => IllegalGeneration,
  25 => UnknownMemberId,
  26 => InvalidSessionTimeout,
  27 => RebalanceInProgress,
  28 => InvalidCommitOffsetSize,
  29 => TopicAuthorizationCode,
  30 => GroupAuthorizationCode,
  31 => ClusterAuthorizationCode,
  32 => InvalidTimestamp,
  33 => UnsupportedSaslMechanism,
  34 => InvalidSaslState,
  35 => UnsupportedVersion,
  36 => TopicAlreadyExists,
  37 => InvalidPartitions,
  38 => InvalidReplicationFactor,
  39 => InvalidReplicaAssignment,
  40 => InvalidConfig,
  41 => NotController,
  42 => InvalidRequest
}

Class Method Summary collapse

Class Method Details

.api_name(api_key) ⇒ Symbol

Returns the symbolic name for an API key.

Parameters:

  • api_key

    Integer

Returns:

  • (Symbol)


110
111
112
# File 'lib/kafka/protocol.rb', line 110

def self.api_name(api_key)
  APIS.fetch(api_key, :unknown)
end

.handle_error(error_code) ⇒ nil

Handles an error code by either doing nothing (if there was no error) or by raising an appropriate exception.

Parameters:

  • error_code

    Integer

Returns:

  • (nil)

Raises:



96
97
98
99
100
101
102
103
104
# File 'lib/kafka/protocol.rb', line 96

def self.handle_error(error_code)
  if error_code == 0
    # No errors, yay!
  elsif error = ERRORS[error_code]
    raise error
  else
    raise UnknownError, "Unknown error with code #{error_code}"
  end
end