Class: LogStash::Codecs::AvroHeader

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/avro_header.rb

Overview

Read serialized Avro records as Logstash events

This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.

Encoding

This codec is for serializing individual Logstash events as Avro datums that are Avro binary blobs. It does not encode Logstash events into an Avro file.

Decoding

This codec is for deserializing individual Avro records. It is not for reading Avro files. Avro files have a unique format that must be handled upon input.

Usage

To read messages from Kafka input:

source,ruby

input {

kafka {
  codec => avro {
      schema_uri => "/tmp/schema.avsc"
  }
}

} filter

...

output

...


Avro messages may have a header before the binary data indicating the schema, as described in the avro.apache.org/docs/1.8.2/spec.html#single_object_encoding[the Avro spec].

The length of the header depends on how many bytes are used for a marker and the avro.apache.org/docs/1.8.2/spec.html#schema_fingerprints[fingerprint algorithm]. The fingerprint might be 8 bytes for CRC-64-AVRO, 16 bytes for MD5, or 32 bytes for SHA-256.

Another option is the docs.confluent.io/current/schema-registry/docs/intro.html[Confluent Schema Registry], which uses a server to assign ids to different versions of schemas and look them up. That’s supported by other logstash plugins, e.g. github.com/revpoint/logstash-codec-avro_schema_registry.

At a minimum, specify the header length, and the plugin will skip those bytes before passing the input to the Avro parser.

source,ruby

input {

kafka {
  codec => avro {
      schema_uri => "/tmp/schema.avsc"
      header_length => 10
  }
}

}


If you specify the header marker, the plugin will attempt to match those bytes at the beginning of the data. If they match, it will skip the header bytes, otherwise it will pass all the data to the Avro parser.

source,ruby

input {

kafka {
  codec => avro {
      schema_uri => "/tmp/schema.avsc"
      header_length => 10
      # Marker bytes, specified as integer
      header_marker => [195, 1] # 0xC3, 0x01
  }
}

}


Instance Method Summary collapse

Instance Method Details

#decode(data) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/logstash/codecs/avro_header.rb', line 126

def decode(data)
  begin
    binary_data = Base64.strict_decode64(data)
  rescue
    binary_data = data
  end

  datum = StringIO.new(binary_data)
  if header_length > 0
    if binary_data.length < header_length
      @logger.error('message is too small to decode header')
      # Ignore header and try to parse as Avro
    else
      if header_marker and header_marker.length > 0
        marker_length = header_marker.length
        marker = datum.read(marker_length)
        marker_bytes = marker.unpack("C" * marker_length)
        if marker_bytes == header_marker
          hash_length = header_length - marker_length
          if hash_length > 0
            datum.read(hash_length)
            # TODO: look up schema using hash
          end
        else
          @logger.error('header marker mismatch')
          # Assume that there is no header and try to parse as Avro
          datum.rewind
        end
      else
        # No marker, just read header and ignore it
        datum.read(header_length)
      end
    end
  end
  decoder = Avro::IO::BinaryDecoder.new(datum)
  datum_reader = Avro::IO::DatumReader.new(@schema)
  yield LogStash::Event.new(datum_reader.read(decoder))
rescue => e
  if tag_on_failure
    @logger.error("Avro parse error, original data now in message field", :error => e)
    yield LogStash::Event.new("message" => data, "tags" => ["_avroparsefailure"])
  else
    raise e
  end
end

#encode(event) ⇒ Object



173
174
175
176
177
178
179
# File 'lib/logstash/codecs/avro_header.rb', line 173

def encode(event)
  dw = Avro::IO::DatumWriter.new(@schema)
  buffer = StringIO.new
  encoder = Avro::IO::BinaryEncoder.new(buffer)
  dw.write(event.to_hash, encoder)
  @on_event.call(event, Base64.strict_encode64(buffer.string))
end

#open_and_read(uri_string) ⇒ Object



116
117
118
# File 'lib/logstash/codecs/avro_header.rb', line 116

def open_and_read(uri_string)
  open(uri_string).read
end

#registerObject



121
122
123
# File 'lib/logstash/codecs/avro_header.rb', line 121

def register
  @schema = Avro::Schema.parse(open_and_read(schema_uri))
end