Class: LogStash::Codecs::Avro
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::Avro
- Defined in:
- lib/logstash/codecs/avro.rb
Overview
Read serialized Avro records as Logstash events
This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.
Encoding
This codec is for serializing individual Logstash events as Avro datums that are Avro binary blobs. It does not encode Logstash events into an Avro file.
Decoding
This codec is for deserializing individual Avro records. It is not for reading Avro files. Avro files have a unique format that must be handled upon input.
Usage
Example usage with Kafka input.
- source,ruby
input {
kafka { codec => avro { schema_uri => "/tmp/schema.avsc" } }
} filter
...
output
...
Instance Method Summary collapse
- #decode(data) ⇒ Object
- #encode(event) ⇒ Object
- #open_and_read(uri_string) ⇒ Object
- #register ⇒ Object
Instance Method Details
#decode(data) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/logstash/codecs/avro.rb', line 73 def decode(data) datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data) decoder = Avro::IO::BinaryDecoder.new(datum) datum_reader = Avro::IO::DatumReader.new(@schema) yield LogStash::Event.new(datum_reader.read(decoder)) rescue => e if tag_on_failure @logger.error("Avro parse error, original data now in message field", :error => e) yield LogStash::Event.new("message" => data, "tags" => ["_avroparsefailure"]) else raise e end end |
#encode(event) ⇒ Object
88 89 90 91 92 93 94 |
# File 'lib/logstash/codecs/avro.rb', line 88 def encode(event) dw = Avro::IO::DatumWriter.new(@schema) buffer = StringIO.new encoder = Avro::IO::BinaryEncoder.new(buffer) dw.write(event.to_hash, encoder) @on_event.call(event, Base64.strict_encode64(buffer.string)) end |
#open_and_read(uri_string) ⇒ Object
63 64 65 |
# File 'lib/logstash/codecs/avro.rb', line 63 def open_and_read(uri_string) open(uri_string).read end |
#register ⇒ Object
68 69 70 |
# File 'lib/logstash/codecs/avro.rb', line 68 def register @schema = Avro::Schema.parse(open_and_read(schema_uri)) end |