Class: Kafka::SaslPlainAuthenticator

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/sasl_plain_authenticator.rb

Constant Summary collapse

PLAIN_IDENT =
"PLAIN"

Instance Method Summary collapse

Constructor Details

#initialize(connection:, logger:, authzid:, username:, password:) ⇒ SaslPlainAuthenticator

Returns a new instance of SaslPlainAuthenticator.



5
6
7
8
9
10
11
# File 'lib/kafka/sasl_plain_authenticator.rb', line 5

def initialize(connection:, logger:, authzid:, username:, password:)
  @connection = connection
  @logger = logger
  @authzid = authzid
  @username = username
  @password = password
end

Instance Method Details

#authenticate!Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/kafka/sasl_plain_authenticator.rb', line 13

def authenticate!
  response = @connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new(PLAIN_IDENT))

  @encoder = @connection.encoder
  @decoder = @connection.decoder

  unless response.error_code == 0 && response.enabled_mechanisms.include?(PLAIN_IDENT)
    raise Kafka::Error, "#{PLAIN_IDENT} is not supported."
  end

  # SASL PLAIN
  msg = [@authzid.to_s,
         @username.to_s,
         @password.to_s].join("\000").force_encoding('utf-8')
  @encoder.write_bytes(msg)
  begin
    msg = @decoder.bytes
    raise Kafka::Error, 'SASL PLAIN authentication failed: unknown error' unless msg
  rescue Errno::ETIMEDOUT, EOFError => e
    raise Kafka::Error, "SASL PLAIN authentication failed: #{e.message}"
  end
  @logger.debug 'SASL PLAIN authentication successful.'
end