Class: Kafka::SaslAuthenticator

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/sasl_authenticator.rb

Instance Method Summary collapse

Constructor Details

#initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:, sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:, sasl_scram_username:, sasl_scram_password:, sasl_scram_mechanism:) ⇒ SaslAuthenticator

Returns a new instance of SaslAuthenticator.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/kafka/sasl_authenticator.rb', line 7

def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:,
               sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:,
               sasl_scram_username:, sasl_scram_password:, sasl_scram_mechanism:)
  @logger = logger

  @plain = Sasl::Plain.new(
    authzid: sasl_plain_authzid,
    username: sasl_plain_username,
    password: sasl_plain_password,
    logger: @logger,
  )

  @gssapi = Sasl::Gssapi.new(
    principal: sasl_gssapi_principal,
    keytab: sasl_gssapi_keytab,
    logger: @logger,
  )

  @scram = Sasl::Scram.new(
    username: sasl_scram_username,
    password: sasl_scram_password,
    mechanism: sasl_scram_mechanism,
    logger: @logger,
  )
end

Instance Method Details

#authenticate!(connection) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/kafka/sasl_authenticator.rb', line 33

def authenticate!(connection)
  mechanism = [@gssapi, @plain, @scram].find(&:configured?)

  return if mechanism.nil?

  ident = mechanism.ident
  response = connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new(ident))

  unless response.error_code == 0 && response.enabled_mechanisms.include?(ident)
    raise Kafka::Error, "#{ident} is not supported."
  end

  mechanism.authenticate!(connection.to_s, connection.encoder, connection.decoder)
end