Logstash Codec - Avro Schema Registry

v1.2.0

This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.

Decode/encode Avro records as Logstash events using the associated Avro schema from a Confluent schema registry. (https://github.com/confluentinc/schema-registry)

Installation

logstash-plugin install logstash-codec-avro_schema_registry

Decoding (input)

When this codec is used to decode the input, you may pass the following options:

  • endpoint - always required.
  • username - optional.
  • password - optional.
  • tag_on_failure - tag events with _avroparsefailure when decode fails

If the input stream is binary encoded, you should use the ByteArrayDeserializer in the Kafka input config.

Encoding (output)

This codec uses the Confluent schema registry to register a schema and encode the data in Avro using schema_id lookups.

When this codec is used to encode, you may pass the following options:

  • endpoint - always required.
  • username - optional.
  • password - optional.
  • schema_id - when provided, no other options are required.
  • subject_name - required when there is no schema_id.
  • schema_version - when provided, the schema will be looked up in the registry.
  • schema_uri - when provided, JSON schema is loaded from URL or file.
  • schema_string - required when there is no schema_id, schema_version or schema_uri
  • check_compatibility - will check schema compatibility before encoding.
  • register_schema - will register the JSON schema if it does not exist.
  • binary_encoded - will output the encoded event as a ByteArray. Requires the ByteArraySerializer to be set in the Kafka output config.
  • base64_encoded - will output the encoded event as a base64 string.
  • client_certificate - Client TLS certificate for mutual TLS
  • client_key - Client TLS key for mutual TLS
  • ca_certificate - CA Certificate
  • verify_mode - SSL Verify modes. Valid options are verify_none, verify_peer, verify_client_once , and verify_fail_if_no_peer_cert. Default is verify_peer

Usage

Basic usage with Kafka input and output.

input {
  kafka {
    ...
    codec => avro_schema_registry {
      endpoint => "http://schemas.example.com"
    }
    value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
  }
}
filter {
  ...
}
output {
  kafka {
    ...
    codec => avro_schema_registry {
      endpoint => "http://schemas.example.com"
      subject_name => "my_kafka_subject_name"
      schema_uri => "/app/my_kafka_subject.avsc"
      register_schema => true
    }
    value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
  }
}

Using signed certificate for registry authentication

output {
  kafka {
    ...
    codec => avro_schema_registry {
      endpoint => "http://schemas.example.com"
      schema_id => 47
      client_key          => "./client.key"
      client_certificate  => "./client.crt"
      ca_certificate      => "./ca.pem"
      verify_mode         => "verify_peer"
    }
    value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
  }
}