Class: AvroTurf::Messaging

Inherits:
Object
  • Object
show all
Defined in:
lib/avro_turf/messaging.rb

Overview

Provides a way to encode and decode messages without having to embed schemas in the encoded data. Confluent’s Schema Registry is used to register a schema when encoding a message – the registry will issue a schema id that will be included in the encoded data alongside the actual message. When decoding the data, the schema id will be used to look up the writer’s schema from the registry.

1: github.com/confluentinc/schema-registry

Constant Summary collapse

MAGIC_BYTE =
[0].pack("C").freeze

Instance Method Summary collapse

Constructor Details

#initialize(registry: nil, registry_url: nil, schema_store: nil, schemas_path: nil, namespace: nil, logger: nil) ⇒ Messaging

Instantiate a new Messaging instance with the given configuration.

registry - A schema registry object that responds to all methods in the

AvroTurf::ConfluentSchemaRegistry interface.

registry_url - The String URL of the schema registry that should be used. schema_store - A schema store object that responds to #find(schema_name, namespace). schemas_path - The String file system path where local schemas are stored. namespace - The String default schema namespace. logger - The Logger that should be used to log information (optional).



34
35
36
37
38
39
40
# File 'lib/avro_turf/messaging.rb', line 34

def initialize(registry: nil, registry_url: nil, schema_store: nil, schemas_path: nil, namespace: nil, logger: nil)
  @logger = logger || Logger.new($stderr)
  @namespace = namespace
  @schema_store = schema_store || SchemaStore.new(path: schemas_path || DEFAULT_SCHEMAS_PATH)
  @registry = registry || CachedConfluentSchemaRegistry.new(ConfluentSchemaRegistry.new(registry_url, logger: @logger))
  @schemas_by_id = {}
end

Instance Method Details

#decode(data, schema_name: nil, namespace: @namespace) ⇒ Object

Decodes data into the original message.

data - A String containing encoded data. schema_name - The String name of the schema that should be used to decode

the data. Must match the schema used when encoding (optional).

namespace - The namespace of the schema (optional).

Returns the decoded message.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/avro_turf/messaging.rb', line 90

def decode(data, schema_name: nil, namespace: @namespace)
  readers_schema = schema_name && @schema_store.find(schema_name, namespace)
  stream = StringIO.new(data)
  decoder = Avro::IO::BinaryDecoder.new(stream)

  # The first byte is MAGIC!!!
  magic_byte = decoder.read(1)

  if magic_byte != MAGIC_BYTE
    raise "Expected data to begin with a magic byte, got `#{magic_byte.inspect}`"
  end

  # The schema id is a 4-byte big-endian integer.
  schema_id = decoder.read(4).unpack("N").first

  writers_schema = @schemas_by_id.fetch(schema_id) do
    schema_json = @registry.fetch(schema_id)
    @schemas_by_id[schema_id] = Avro::Schema.parse(schema_json)
  end

  reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
  reader.read(decoder)
rescue Excon::Error::NotFound
  raise SchemaNotFoundError.new("Schema with id: #{schema_id} is not found on registry")
end

#encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil) ⇒ Object

Encodes a message using the specified schema.

message - The message that should be encoded. Must be compatible with

the schema.

schema_name - The String name of the schema that should be used to encode

the data.

namespace - The namespace of the schema (optional). subject - The subject name the schema should be registered under in

the schema registry (optional).

version - The integer version of the schema that should be used to decode

the data. Must match the schema used when encoding (optional).

Returns the encoded data as a String.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/avro_turf/messaging.rb', line 55

def encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil)
  schema_id, schema = if subject && version
    fetch_schema(subject, version)
  elsif schema_name
    register_schema(subject, schema_name, namespace)
  else
    raise ArgumentError.new('Neither schema_name nor subject + version provided to determine the schema.')
  end

  stream = StringIO.new
  writer = Avro::IO::DatumWriter.new(schema)
  encoder = Avro::IO::BinaryEncoder.new(stream)

  # Always start with the magic byte.
  encoder.write(MAGIC_BYTE)

  # The schema id is encoded as a 4-byte big-endian integer.
  encoder.write([schema_id].pack("N"))

  # The actual message comes last.
  writer.write(message, encoder)

  stream.string
rescue Excon::Error::NotFound
  raise SchemaNotFoundError.new("Schema with subject: `#{subject}` version: `#{version}` is not found on registry")
end