Class: LogStash::Codecs::Avro

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/avro.rb

Overview

Read serialized Avro records as Logstash events

This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.

Encoding

This codec is for serializing individual Logstash events as Avro datums that are Avro binary blobs. It does not encode Logstash events into an Avro file.

Decoding

This codec is for deserializing individual Avro records. It is not for reading Avro files. Avro files have a unique format that must be handled upon input.

Usage

Example usage with Kafka input.

source,ruby

input {

kafka {
  codec => avro {
      schema_uri => "/tmp/schema.avsc"
  }
}

} filter

...

output

...


Instance Method Summary collapse

Instance Method Details

#decode(data) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/logstash/codecs/avro.rb', line 73

def decode(data)
  datum = StringIO.new(Base64.strict_decode64(data)) rescue StringIO.new(data)
  decoder = Avro::IO::BinaryDecoder.new(datum)
  datum_reader = Avro::IO::DatumReader.new(@schema)
  yield LogStash::Event.new(datum_reader.read(decoder))
rescue => e
  if tag_on_failure
    @logger.error("Avro parse error, original data now in message field", :error => e)
    yield LogStash::Event.new("message" => data, "tags" => ["_avroparsefailure"])
  else
    raise e
  end
end

#encode(event) ⇒ Object



88
89
90
91
92
93
94
# File 'lib/logstash/codecs/avro.rb', line 88

def encode(event)
  dw = Avro::IO::DatumWriter.new(@schema)
  buffer = StringIO.new
  encoder = Avro::IO::BinaryEncoder.new(buffer)
  dw.write(event.to_hash, encoder)
  @on_event.call(event, Base64.strict_encode64(buffer.string))
end

#open_and_read(uri_string) ⇒ Object



63
64
65
# File 'lib/logstash/codecs/avro.rb', line 63

def open_and_read(uri_string)
  open(uri_string).read
end

#registerObject



68
69
70
# File 'lib/logstash/codecs/avro.rb', line 68

def register
  @schema = Avro::Schema.parse(open_and_read(schema_uri))
end