Class: LogStash::Codecs::AvroSchemaRegistry
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::AvroSchemaRegistry
- Defined in:
- lib/logstash/codecs/avro_schema_registry.rb
Overview
Logstash Codec - Avro Schema Registry
This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.
Decode/encode Avro records as Logstash events using the associated Avro schema from a Confluent schema registry. (github.com/confluentinc/schema-registry)
Decoding (input)
When this codec is used to decode the input, you may pass the following options:
-
“endpoint“ - always required.
-
“username“ - optional.
-
“password“ - optional.
-
“tag_on_failure“ - tag events with “_avroparsefailure“ when decode fails
If the input stream is binary encoded, you should use the “ByteArrayDeserializer“ in the Kafka input config.
Encoding (output)
This codec uses the Confluent schema registry to register a schema and encode the data in Avro using schema_id lookups.
When this codec is used to encode, you may pass the following options:
-
“endpoint“ - always required.
-
“username“ - optional.
-
“password“ - optional.
-
“schema_id“ - when provided, no other options are required.
-
“subject_name“ - required when there is no “schema_id“.
-
“schema_version“ - when provided, the schema will be looked up in the registry.
-
“schema_uri“ - when provided, JSON schema is loaded from URL or file.
-
“schema_string“ - required when there is no “schema_id“, “schema_version“ or “schema_uri“
-
“check_compatibility“ - will check schema compatibility before encoding.
-
“register_schema“ - will register the JSON schema if it does not exist.
-
“binary_encoded“ - will output the encoded event as a ByteArray.
-
“base64_encoded“ - will output in base64 encoding, deafault is true Requires the “ByteArraySerializer“ to be set in the Kafka output config.
-
“client_certificate“ - Client TLS certificate for mutual TLS
-
“client_key“ - Client TLS key for mutual TLS
-
“ca_certificate“ - CA Certificate
-
“verify_mode“ - SSL Verify modes. Valid options are ‘verify_none`, `verify_peer`, `verify_client_once`, and `verify_fail_if_no_peer_cert`. Default is `verify_peer`
Usage
Example usage with Kafka input and output.
- source,ruby
input {
kafka { ... codec => avro_schema_registry { endpoint => "http://schemas.example.com" } value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" }
} filter
...
output {
kafka { ... codec => avro_schema_registry { endpoint => "http://schemas.example.com" subject_name => "my_kafka_subject_name" schema_uri => "/app/my_kafka_subject.avsc" register_schema => true } value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer" }
}
Using signed certificate for registry authentication
- source,ruby
output {
kafka { ... codec => avro_schema_registry { endpoint => "http://schemas.example.com" schema_id => 47 client_key => "./client.key" client_certificate => "./client.crt" ca_certificate => "./ca.pem" verify_mode => "verify_peer" } value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer" }
}
Constant Summary collapse
- EXCLUDE_ALWAYS =
[ "@timestamp", "@version" ]
Instance Method Summary collapse
- #clean_event(event) ⇒ Object
- #decode(data) ⇒ Object
- #encode(event) ⇒ Object
- #get_schema(schema_id) ⇒ Object
- #get_write_schema_id ⇒ Object
- #load_schema_json ⇒ Object
- #register ⇒ Object
Instance Method Details
#clean_event(event) ⇒ Object
214 215 216 217 218 |
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 214 def clean_event(event) event_hash = event.to_hash event_hash.delete_if { |key, _| EXCLUDE_ALWAYS.include? key } event_hash end |
#decode(data) ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 221 def decode(data) if data.length < 5 @logger.error('message is too small to decode') else datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data) magic_byte, schema_id = datum.read(5).unpack("cI>") if magic_byte != MAGIC_BYTE @logger.error('message does not start with magic byte') else schema = get_schema(schema_id) decoder = Avro::IO::BinaryDecoder.new(datum) datum_reader = Avro::IO::DatumReader.new(schema) yield LogStash::Event.new(datum_reader.read(decoder)) end end rescue => e if tag_on_failure @logger.error("Avro parse error, original data now in message field", :error => e) yield LogStash::Event.new("message" => data, "tags" => ["_avroparsefailure"]) else raise e end end |
#encode(event) ⇒ Object
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 246 def encode(event) @write_schema_id ||= get_write_schema_id schema = get_schema(@write_schema_id) dw = Avro::IO::DatumWriter.new(schema) buffer = StringIO.new buffer.write(MAGIC_BYTE.chr) buffer.write([@write_schema_id].pack("I>")) encoder = Avro::IO::BinaryEncoder.new(buffer) dw.write(clean_event(event), encoder) if @binary_encoded @on_event.call(event, buffer.string.to_java_bytes) elsif @base64_encoded @on_event.call(event, Base64.strict_encode64(buffer.string)) else @on_event.call(event, buffer.string) end end |
#get_schema(schema_id) ⇒ Object
158 159 160 161 162 163 |
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 158 def get_schema(schema_id) unless @schemas.has_key?(schema_id) @schemas[schema_id] = Avro::Schema.parse(@client.schema(schema_id)) end @schemas[schema_id] end |
#get_write_schema_id ⇒ Object
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 175 def get_write_schema_id() # If schema id is passed, just use that if @schema_id @schema_id else # subject_name is required if @subject_name == nil @logger.error('requires a subject_name') else subject = @client.subject(@subject_name) # If schema_version, load from subject API if @schema_version != nil schema = subject.version(@schema_version) # Otherwise, load schema json and check with registry else schema_json = load_schema_json # If not compatible, raise error if @check_compatibility unless subject.compatible?(schema_json) @logger.error('the schema json is not compatible with the subject. you should fix your schema or change the compatibility level.') end end if @register_schema subject.register_schema(schema_json) unless subject.schema_registered?(schema_json) end schema = subject.verify_schema(schema_json) end # Return schema id schema.id end end end |
#load_schema_json ⇒ Object
165 166 167 168 169 170 171 172 173 |
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 165 def load_schema_json() if @schema_uri open(@schema_uri).read elsif @schema_string @schema_string else @logger.error('you must supply a schema_uri or schema_string in the config') end end |
#register ⇒ Object
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 142 def register @client = if client_certificate != nil SchemaRegistry::Client.new(endpoint, username, password, SchemaRegistry::Client.( client_certificate: client_certificate, client_key: client_key, ca_certificate: ca_certificate, verify_mode: verify_mode )) else SchemaRegistry::Client.new(endpoint, username, password) end @schemas = Hash.new @write_schema_id = nil end |