Class: Kafka::ConnectionBuilder

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/connection_builder.rb

Instance Method Summary collapse

Constructor Details

#initialize(client_id:, logger:, instrumenter:, connect_timeout:, socket_timeout:, ssl_context:, sasl_gssapi_principal:, sasl_gssapi_keytab:, sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:) ⇒ ConnectionBuilder

Returns a new instance of ConnectionBuilder.



6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/kafka/connection_builder.rb', line 6

def initialize(client_id:, logger:, instrumenter:, connect_timeout:, socket_timeout:, ssl_context:, sasl_gssapi_principal:, sasl_gssapi_keytab:, sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:)
  @client_id = client_id
  @logger = logger
  @instrumenter = instrumenter
  @connect_timeout = connect_timeout
  @socket_timeout = socket_timeout
  @ssl_context = ssl_context
  @sasl_gssapi_principal = sasl_gssapi_principal
  @sasl_gssapi_keytab = sasl_gssapi_keytab
  @sasl_plain_authzid = sasl_plain_authzid
  @sasl_plain_username = sasl_plain_username
  @sasl_plain_password = sasl_plain_password
end

Instance Method Details

#build_connection(host, port) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/kafka/connection_builder.rb', line 20

def build_connection(host, port)
  connection = Connection.new(
    host: host,
    port: port,
    client_id: @client_id,
    connect_timeout: @connect_timeout,
    socket_timeout: @socket_timeout,
    logger: @logger,
    instrumenter: @instrumenter,
    ssl_context: @ssl_context
  )

  if authenticate_using_sasl_gssapi?
    sasl_gssapi_authenticate(connection)
  elsif authenticate_using_sasl_plain?
    sasl_plain_authenticate(connection)
  end

  connection
end