Class: Kafka::Sasl::AwsMskIam
- Inherits:
-
Object
- Object
- Kafka::Sasl::AwsMskIam
- Defined in:
- lib/kafka/sasl/awsmskiam.rb
Constant Summary collapse
- AWS_MSK_IAM =
"AWS_MSK_IAM"
Instance Method Summary collapse
- #authenticate!(host, encoder, decoder) ⇒ Object
- #configured? ⇒ Boolean
- #ident ⇒ Object
-
#initialize(aws_region:, access_key_id:, secret_key_id:, logger:) ⇒ AwsMskIam
constructor
A new instance of AwsMskIam.
Constructor Details
#initialize(aws_region:, access_key_id:, secret_key_id:, logger:) ⇒ AwsMskIam
Returns a new instance of AwsMskIam.
12 13 14 15 16 17 18 19 |
# File 'lib/kafka/sasl/awsmskiam.rb', line 12 def initialize(aws_region:, access_key_id:, secret_key_id:, logger:) @semaphore = Mutex.new @aws_region = aws_region @access_key_id = access_key_id @secret_key_id = secret_key_id @logger = TaggedLogger.new(logger) end |
Instance Method Details
#authenticate!(host, encoder, decoder) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/kafka/sasl/awsmskiam.rb', line 29 def authenticate!(host, encoder, decoder) @logger.debug "Authenticating #{@access_key_id} with SASL #{AWS_MSK_IAM}" host_without_port = host.split(':', -1).first time_now = Time.now.utc msg = authentication_payload(host: host_without_port, time_now: time_now) @logger.debug "Sending first client SASL AWS_MSK_IAM message:" @logger.debug msg encoder.write_bytes(msg) begin @logger.debug "Decoding first server SASL AWS_MSK_IAM message" = decoder.bytes @logger.debug "Received first server SASL AWS_MSK_IAM message: #{@server_first_message}" raise Kafka::Error, "SASL AWS_MSK_IAM authentication failed: unknown error" unless rescue Errno::ETIMEDOUT, EOFError => e @logger.error e.backtrace raise Kafka::Error, "SASL AWS_MSK_IAM authentication failed: #{e.message}" end @logger.debug "SASL #{AWS_MSK_IAM} authentication successful" end |
#configured? ⇒ Boolean
25 26 27 |
# File 'lib/kafka/sasl/awsmskiam.rb', line 25 def configured? @aws_region && @access_key_id && @secret_key_id end |
#ident ⇒ Object
21 22 23 |
# File 'lib/kafka/sasl/awsmskiam.rb', line 21 def ident AWS_MSK_IAM end |