Class: LogStash::Codecs::AvroSchemaRegistry

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/avro_schema_registry.rb

Overview

Logstash Codec - Avro Schema Registry

This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.

Decode/encode Avro records as Logstash events using the associated Avro schema from a Confluent schema registry. (github.com/confluentinc/schema-registry)

Decoding (input)

When this codec is used to decode the input, you may pass the following options:

  • “endpoint“ - always required.

  • “username“ - optional.

  • “password“ - optional.

  • “tag_on_failure“ - tag events with “_avroparsefailure“ when decode fails

If the input stream is binary encoded, you should use the “ByteArrayDeserializer“ in the Kafka input config.

Encoding (output)

This codec uses the Confluent schema registry to register a schema and encode the data in Avro using schema_id lookups.

When this codec is used to encode, you may pass the following options:

  • “endpoint“ - always required.

  • “username“ - optional.

  • “password“ - optional.

  • “schema_id“ - when provided, no other options are required.

  • “subject_name“ - required when there is no “schema_id“.

  • “schema_version“ - when provided, the schema will be looked up in the registry.

  • “schema_uri“ - when provided, JSON schema is loaded from URL or file.

  • “schema_string“ - required when there is no “schema_id“, “schema_version“ or “schema_uri“

  • “check_compatibility“ - will check schema compatibility before encoding.

  • “register_schema“ - will register the JSON schema if it does not exist.

  • “binary_encoded“ - will output the encoded event as a ByteArray.

  • “base64_encoded“ - will output in base64 encoding, deafault is true Requires the “ByteArraySerializer“ to be set in the Kafka output config.

  • “client_certificate“ - Client TLS certificate for mutual TLS

  • “client_key“ - Client TLS key for mutual TLS

  • “ca_certificate“ - CA Certificate

  • “verify_mode“ - SSL Verify modes. Valid options are ‘verify_none`, `verify_peer`, `verify_client_once`, and `verify_fail_if_no_peer_cert`. Default is `verify_peer`

Usage

Example usage with Kafka input and output.

source,ruby

input {

kafka {
  ...
  codec => avro_schema_registry {
    endpoint => "http://schemas.example.com"
  }
  value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}

} filter

...

output {

kafka {
  ...
  codec => avro_schema_registry {
    endpoint => "http://schemas.example.com"
    subject_name => "my_kafka_subject_name"
    schema_uri => "/app/my_kafka_subject.avsc"
    register_schema => true
  }
  value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}

}


Using signed certificate for registry authentication

source,ruby

output {

kafka {
  ...
  codec => avro_schema_registry {
    endpoint => "http://schemas.example.com"
    schema_id => 47
    client_key          => "./client.key"
    client_certificate  => "./client.crt"
    ca_certificate      => "./ca.pem"
    verify_mode         => "verify_peer"
  }
  value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}

}


Constant Summary collapse

EXCLUDE_ALWAYS =
[ "@timestamp", "@version" ]

Instance Method Summary collapse

Instance Method Details

#clean_event(event) ⇒ Object



214
215
216
217
218
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 214

def clean_event(event)
  event_hash = event.to_hash
  event_hash.delete_if { |key, _| EXCLUDE_ALWAYS.include? key }
  event_hash
end

#decode(data) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 221

def decode(data)
  if data.length < 5
    @logger.error('message is too small to decode')
  else
    datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data)
    magic_byte, schema_id = datum.read(5).unpack("cI>")
    if magic_byte != MAGIC_BYTE
      @logger.error('message does not start with magic byte')
    else
      schema = get_schema(schema_id)
      decoder = Avro::IO::BinaryDecoder.new(datum)
      datum_reader = Avro::IO::DatumReader.new(schema)
      yield LogStash::Event.new(datum_reader.read(decoder))
    end
  end
rescue => e
  if tag_on_failure
    @logger.error("Avro parse error, original data now in message field", :error => e)
    yield LogStash::Event.new("message" => data, "tags" => ["_avroparsefailure"])
  else
    raise e
  end
end

#encode(event) ⇒ Object



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 246

def encode(event)
  @write_schema_id ||= get_write_schema_id
  schema = get_schema(@write_schema_id)
  dw = Avro::IO::DatumWriter.new(schema)
  buffer = StringIO.new
  buffer.write(MAGIC_BYTE.chr)
  buffer.write([@write_schema_id].pack("I>"))
  encoder = Avro::IO::BinaryEncoder.new(buffer)
  dw.write(clean_event(event), encoder)
  if @binary_encoded
     @on_event.call(event, buffer.string.to_java_bytes)
  elsif @base64_encoded
     @on_event.call(event, Base64.strict_encode64(buffer.string))
  else
     @on_event.call(event, buffer.string)
  end
end

#get_schema(schema_id) ⇒ Object



158
159
160
161
162
163
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 158

def get_schema(schema_id)
  unless @schemas.has_key?(schema_id)
    @schemas[schema_id] = Avro::Schema.parse(@client.schema(schema_id))
  end
  @schemas[schema_id]
end

#get_write_schema_idObject



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 175

def get_write_schema_id()
  # If schema id is passed, just use that
  if @schema_id
    @schema_id

  else
    # subject_name is required
    if @subject_name == nil
      @logger.error('requires a subject_name')
    else
      subject = @client.subject(@subject_name)

      # If schema_version, load from subject API
      if @schema_version != nil
        schema = subject.version(@schema_version)

      # Otherwise, load schema json and check with registry
      else
        schema_json = load_schema_json

        # If not compatible, raise error
        if @check_compatibility
          unless subject.compatible?(schema_json)
            @logger.error('the schema json is not compatible with the subject. you should fix your schema or change the compatibility level.')
          end
        end

        if @register_schema
          subject.register_schema(schema_json) unless subject.schema_registered?(schema_json)
        end

        schema = subject.verify_schema(schema_json)
      end
      # Return schema id
      schema.id
    end
  end
end

#load_schema_jsonObject



165
166
167
168
169
170
171
172
173
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 165

def load_schema_json()
  if @schema_uri
    open(@schema_uri).read
  elsif @schema_string
    @schema_string
  else
    @logger.error('you must supply a schema_uri or schema_string in the config')
  end
end

#registerObject



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 142

def register
  @client = if client_certificate != nil
    SchemaRegistry::Client.new(endpoint, username, password, SchemaRegistry::Client.connection_options(
      client_certificate: client_certificate,
      client_key: client_key,
      ca_certificate: ca_certificate,
      verify_mode: verify_mode
    ))
  else
    SchemaRegistry::Client.new(endpoint, username, password)
  end

  @schemas = Hash.new
  @write_schema_id = nil
end