Module: LogStash::PluginMixins::Kafka::AvroSchemaRegistry

Included in:
Inputs::Kafka
Defined in:
lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



6
7
8
9
# File 'lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb', line 6

def self.included(base)
  base.extend(self)
  base.setup_schema_registry_config
end

Instance Method Details

#check_schema_registry_parametersObject



50
51
52
53
54
55
56
57
# File 'lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb', line 50

def check_schema_registry_parameters
  if @schema_registry_url
    check_for_schema_registry_conflicts
    @schema_registry_proxy_host, @schema_registry_proxy_port  = split_proxy_into_host_and_port(schema_registry_proxy)
    check_for_key_and_secret
    check_for_schema_registry_connectivity_and_subjects if schema_registry_validation?
  end
end

#schema_registry_validation?Boolean

Returns:

  • (Boolean)


59
60
61
62
63
64
# File 'lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb', line 59

def schema_registry_validation?
  return false if schema_registry_validation.to_s == 'skip'
  return false if using_kerberos? # pre-validation doesn't support kerberos

  true
end

#setup_schema_registry_configObject



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb', line 11

def setup_schema_registry_config
  # Option to set key to access Schema Registry.
  config :schema_registry_key, :validate => :string

  # Option to set secret to access Schema Registry.
  config :schema_registry_secret, :validate => :password

  # Option to set the endpoint of the Schema Registry.
  # This option permit the usage of Avro Kafka deserializer which retrieve the schema of the Avro message from an
  # instance of schema registry. If this option has value `value_deserializer_class` nor `topics_pattern` could be valued
  config :schema_registry_url, :validate => :uri

  # Option to set the proxy of the Schema Registry.
  # This option permits to define a proxy to be used to reach the schema registry service instance.
  config :schema_registry_proxy, :validate => :uri

  # If schema registry client authentication is required, this setting stores the keystore path.
  config :schema_registry_ssl_keystore_location, :validate => :string

  # The keystore password.
  config :schema_registry_ssl_keystore_password, :validate => :password

  # The keystore type
  config :schema_registry_ssl_keystore_type, :validate => ['jks', 'PKCS12'], :default => "jks"

  # The JKS truststore path to validate the Schema Registry's certificate.
  config :schema_registry_ssl_truststore_location, :validate => :string

  # The truststore password.
  config :schema_registry_ssl_truststore_password, :validate => :password

  # The truststore type
  config :schema_registry_ssl_truststore_type, :validate => ['jks', 'PKCS12'], :default => "jks"

  # Option to skip validating the schema registry during registration. This can be useful when using
  # certificate based auth
  config :schema_registry_validation, :validate => ['auto', 'skip'], :default => 'auto'
end

#using_kerberos?Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb', line 66

def using_kerberos?
  security_protocol == "SASL_PLAINTEXT" || security_protocol == "SASL_SSL"
end