Class: AvroTurf::Messaging
- Inherits:
-
Object
- Object
- AvroTurf::Messaging
- Defined in:
- lib/avro_turf/messaging.rb
Overview
Provides a way to encode and decode messages without having to embed schemas in the encoded data. Confluent’s Schema Registry is used to register a schema when encoding a message – the registry will issue a schema id that will be included in the encoded data alongside the actual message. When decoding the data, the schema id will be used to look up the writer’s schema from the registry.
Constant Summary collapse
- MAGIC_BYTE =
[0].pack("C").freeze
Instance Method Summary collapse
-
#decode(data, schema_name: nil, namespace: @namespace) ⇒ Object
Decodes data into the original message.
-
#encode(message, schema_id, schema: nil, namespace: @namespace) ⇒ Object
Encodes a message using the specified schema.
-
#initialize(registry: nil, registry_url: nil, schema_store: nil, schemas_path: nil, namespace: nil, logger: nil) ⇒ Messaging
constructor
Instantiate a new Messaging instance with the given configuration.
Constructor Details
#initialize(registry: nil, registry_url: nil, schema_store: nil, schemas_path: nil, namespace: nil, logger: nil) ⇒ Messaging
Instantiate a new Messaging instance with the given configuration.
registry - A schema registry object that responds to all methods in the
AvroTurf::SchemaRegistry interface.
registry_url - The String URL of the schema registry that should be used. schema_store - A schema store object that responds to #find(schema_name, namespace). schemas_path - The String file system path where local schemas are stored. namespace - The String default schema namespace. logger - The Logger that should be used to log information (optional).
29 30 31 32 33 34 35 |
# File 'lib/avro_turf/messaging.rb', line 29 def initialize(registry: nil, registry_url: nil, schema_store: nil, schemas_path: nil, namespace: nil, logger: nil) @logger = logger || Logger.new($stderr) @namespace = namespace @schema_store = schema_store || SchemaStore.new(path: schemas_path || DEFAULT_SCHEMAS_PATH) @registry = registry || CachedSchemaRegistry.new(SchemaRegistry.new(registry_url, logger: @logger)) @schemas_by_id = {} end |
Instance Method Details
#decode(data, schema_name: nil, namespace: @namespace) ⇒ Object
Decodes data into the original message.
data - A String containing encoded data. schema_name - The String name of the schema that should be used to decode
the data. Must match the schema used when encoding (optional).
namespace - The namespace of the schema (optional).
Returns the decoded message.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/avro_turf/messaging.rb', line 77 def decode(data, schema_name: nil, namespace: @namespace) readers_schema = schema_name && @schema_store.find(schema_name, namespace) stream = StringIO.new(data) decoder = Avro::IO::BinaryDecoder.new(stream) # The first byte is MAGIC!!! magic_byte = decoder.read(1) if magic_byte != MAGIC_BYTE raise "Expected data to begin with a magic byte, got `#{magic_byte.inspect}`" end # The schema id is a 4-byte big-endian integer. schema_id = decoder.read(4).unpack("N").first writers_schema = @schemas_by_id.fetch(schema_id) do schema_json = @registry.fetch(schema_id) @schemas_by_id[schema_id] = Avro::Schema.parse(schema_json) end reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) reader.read(decoder) end |
#encode(message, schema_id, schema: nil, namespace: @namespace) ⇒ Object
Encodes a message using the specified schema.
message - The message that should be encoded. Must be compatible with
the schema.
schema_name - The String name of the schema that should be used to encode
the data.
namespace - The namespace of the schema (optional).
Returns the encoded data as a String.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/avro_turf/messaging.rb', line 46 def encode(, schema_id, schema: nil, namespace: @namespace) # schema = @schema_store.find(schema_name, namespace) # Schemas are registered under the full name of the top level Avro record # type, or `subject` if it's provided. # schema_id = @registry.register(subject || schema.fullname, schema) stream = StringIO.new writer = Avro::IO::DatumWriter.new(schema) encoder = Avro::IO::BinaryEncoder.new(stream) # Always start with the magic byte. encoder.write(MAGIC_BYTE) # The schema id is encoded as a 4-byte big-endian integer. encoder.write([schema_id].pack("N")) # The actual message comes last. writer.write(, encoder) stream.string end |