Class: LogStash::Codecs::AvroHeader
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::AvroHeader
- Defined in:
- lib/logstash/codecs/avro_header.rb
Overview
Read serialized Avro records as Logstash events
This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.
Encoding
This codec is for serializing individual Logstash events as Avro datums that are Avro binary blobs. It does not encode Logstash events into an Avro file.
Decoding
This codec is for deserializing individual Avro records. It is not for reading Avro files. Avro files have a unique format that must be handled upon input.
Usage
To read messages from Kafka input:
- source,ruby
input {
kafka { codec => avro { schema_uri => "/tmp/schema.avsc" } }} filter
...output
...
Avro messages may have a header before the binary data indicating the schema, as described in the avro.apache.org/docs/1.8.2/spec.html#single_object_encoding[the Avro spec].
The length of the header depends on how many bytes are used for a marker and the avro.apache.org/docs/1.8.2/spec.html#schema_fingerprints[fingerprint algorithm]. The fingerprint might be 8 bytes for CRC-64-AVRO, 16 bytes for MD5, or 32 bytes for SHA-256.
Another option is the docs.confluent.io/current/schema-registry/docs/intro.html[Confluent Schema Registry], which uses a server to assign ids to different versions of schemas and look them up. That’s supported by other logstash plugins, e.g. github.com/revpoint/logstash-codec-avro_schema_registry.
At a minimum, specify the header length, and the plugin will skip those bytes before passing the input to the Avro parser.
- source,ruby
input {
kafka { codec => avro { schema_uri => "/tmp/schema.avsc" header_length => 10 } }}
If you specify the header marker, the plugin will attempt to match those bytes at the beginning of the data. If they match, it will skip the header bytes, otherwise it will pass all the data to the Avro parser.
- source,ruby
input {
kafka { codec => avro { schema_uri => "/tmp/schema.avsc" header_length => 10 # Marker bytes, specified as integer header_marker => [195, 1] # 0xC3, 0x01 } }}
Instance Method Summary collapse
- #decode(data) ⇒ Object
- #encode(event) ⇒ Object
- #open_and_read(uri_string) ⇒ Object
- #register ⇒ Object
Instance Method Details
#decode(data) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/logstash/codecs/avro_header.rb', line 126 def decode(data) begin binary_data = Base64.strict_decode64(data) rescue binary_data = data end datum = StringIO.new(binary_data) if header_length > 0 if binary_data.length < header_length @logger.error('message is too small to decode header') # Ignore header and try to parse as Avro else if header_marker and header_marker.length > 0 marker_length = header_marker.length marker = datum.read(marker_length) marker_bytes = marker.unpack("C" * marker_length) if marker_bytes == header_marker hash_length = header_length - marker_length if hash_length > 0 datum.read(hash_length) # TODO: look up schema using hash end else @logger.error('header marker mismatch') # Assume that there is no header and try to parse as Avro datum.rewind end else # No marker, just read header and ignore it datum.read(header_length) end end end decoder = Avro::IO::BinaryDecoder.new(datum) datum_reader = Avro::IO::DatumReader.new(@schema) yield LogStash::Event.new(datum_reader.read(decoder)) rescue => e if tag_on_failure @logger.error("Avro parse error, original data now in message field", :error => e) yield LogStash::Event.new("message" => data, "tags" => ["_avroparsefailure"]) else raise e end end |
#encode(event) ⇒ Object
173 174 175 176 177 178 179 |
# File 'lib/logstash/codecs/avro_header.rb', line 173 def encode(event) dw = Avro::IO::DatumWriter.new(@schema) buffer = StringIO.new encoder = Avro::IO::BinaryEncoder.new(buffer) dw.write(event.to_hash, encoder) @on_event.call(event, Base64.strict_encode64(buffer.string)) end |
#open_and_read(uri_string) ⇒ Object
116 117 118 |
# File 'lib/logstash/codecs/avro_header.rb', line 116 def open_and_read(uri_string) open(uri_string).read end |
#register ⇒ Object
121 122 123 |
# File 'lib/logstash/codecs/avro_header.rb', line 121 def register @schema = Avro::Schema.parse(open_and_read(schema_uri)) end |