Method: Kafka::Client#initialize

Defined in:
lib/kafka/client.rb

#initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, sasl_aws_msk_iam_access_key_id: nil, sasl_aws_msk_iam_secret_key_id: nil, sasl_aws_msk_iam_aws_region: nil, sasl_aws_msk_iam_session_token: nil, sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true, resolve_seed_brokers: false) ⇒ Client

Initializes a new Kafka client.

Parameters:

  • seed_brokers (Array<String>, String)

    the list of brokers used to initialize the client. Either an Array of connections, or a comma separated string of connections. A connection can either be a string of "host:port" or a full URI with a scheme. If there's a scheme it's ignored and only host/port are used.

  • client_id (String) (defaults to: "ruby-kafka")

    the identifier for this application.

  • logger (Logger) (defaults to: nil)

    the logger that should be used by the client.

  • connect_timeout (Integer, nil) (defaults to: nil)

    the timeout setting for connecting to brokers. See BrokerPool#initialize.

  • socket_timeout (Integer, nil) (defaults to: nil)

    the timeout setting for socket connections. See BrokerPool#initialize.

  • ssl_ca_cert (String, Array<String>, nil) (defaults to: nil)

    a PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with an SSL connection.

  • ssl_ca_cert_file_path (String, Array<String>, nil) (defaults to: nil)

    a path on the filesystem, or an Array of paths, to PEM encoded CA cert(s) to use with an SSL connection.

  • ssl_client_cert (String, nil) (defaults to: nil)

    a PEM encoded client cert to use with an SSL connection. Must be used in combination with ssl_client_cert_key.

  • ssl_client_cert_key (String, nil) (defaults to: nil)

    a PEM encoded client cert key to use with an SSL connection. Must be used in combination with ssl_client_cert.

  • ssl_client_cert_key_password (String, nil) (defaults to: nil)

    the password required to read the ssl_client_cert_key. Must be used in combination with ssl_client_cert_key.

  • sasl_gssapi_principal (String, nil) (defaults to: nil)

    a KRB5 principal

  • sasl_gssapi_keytab (String, nil) (defaults to: nil)

    a KRB5 keytab filepath

  • sasl_scram_username (String, nil) (defaults to: nil)

    SCRAM username

  • sasl_scram_password (String, nil) (defaults to: nil)

    SCRAM password

  • sasl_scram_mechanism (String, nil) (defaults to: nil)

    Scram mechanism, either "sha256" or "sha512"

  • sasl_over_ssl (Boolean) (defaults to: true)

    whether to enforce SSL with SASL

  • ssl_ca_certs_from_system (Boolean) (defaults to: false)

    whether to use the CA certs from the system's default certificate store.

  • partitioner (Partitioner, nil) (defaults to: nil)

    the partitioner that should be used by the client.

  • sasl_oauth_token_provider (Object, nil) (defaults to: nil)

    OAuthBearer Token Provider instance that implements method token. See Sasl::OAuth#initialize

  • ssl_verify_hostname (Boolean, true) (defaults to: true)

    whether to verify that the host serving the SSL certificate and the signing chain of the certificate have the correct domains based on the CA certificate

  • resolve_seed_brokers (Boolean) (defaults to: false)

    whether to resolve each hostname of the seed brokers. If a broker is resolved to multiple IP addresses, the client tries to connect to each of the addresses until it can connect.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/kafka/client.rb', line 83

def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil,
               ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
               ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil,
               sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil,
               sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil,
               sasl_aws_msk_iam_access_key_id: nil,
               sasl_aws_msk_iam_secret_key_id: nil,
               sasl_aws_msk_iam_aws_region: nil,
               sasl_aws_msk_iam_session_token: nil,
               sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true,
               resolve_seed_brokers: false)
  @logger = TaggedLogger.new(logger)
  @instrumenter = Instrumenter.new(client_id: client_id)
  @seed_brokers = normalize_seed_brokers(seed_brokers)
  @resolve_seed_brokers = resolve_seed_brokers

  ssl_context = SslContext.build(
    ca_cert_file_path: ssl_ca_cert_file_path,
    ca_cert: ssl_ca_cert,
    client_cert: ssl_client_cert,
    client_cert_key: ssl_client_cert_key,
    client_cert_key_password: ssl_client_cert_key_password,
    client_cert_chain: ssl_client_cert_chain,
    ca_certs_from_system: ssl_ca_certs_from_system,
    verify_hostname: ssl_verify_hostname
  )

  sasl_authenticator = SaslAuthenticator.new(
    sasl_gssapi_principal: sasl_gssapi_principal,
    sasl_gssapi_keytab: sasl_gssapi_keytab,
    sasl_plain_authzid: sasl_plain_authzid,
    sasl_plain_username: sasl_plain_username,
    sasl_plain_password: sasl_plain_password,
    sasl_scram_username: sasl_scram_username,
    sasl_scram_password: sasl_scram_password,
    sasl_scram_mechanism: sasl_scram_mechanism,
    sasl_aws_msk_iam_access_key_id: sasl_aws_msk_iam_access_key_id,
    sasl_aws_msk_iam_secret_key_id: sasl_aws_msk_iam_secret_key_id,
    sasl_aws_msk_iam_aws_region: sasl_aws_msk_iam_aws_region,
    sasl_aws_msk_iam_session_token: sasl_aws_msk_iam_session_token,
    sasl_oauth_token_provider: sasl_oauth_token_provider,
    logger: @logger
  )

  if sasl_authenticator.enabled? && sasl_over_ssl && ssl_context.nil?
    raise ArgumentError, "SASL authentication requires that SSL is configured"
  end

  @connection_builder = ConnectionBuilder.new(
    client_id: client_id,
    connect_timeout: connect_timeout,
    socket_timeout: socket_timeout,
    ssl_context: ssl_context,
    logger: @logger,
    instrumenter: @instrumenter,
    sasl_authenticator: sasl_authenticator
  )

  @cluster = initialize_cluster
  @partitioner = partitioner || Partitioner.new
end