Class: AvroTurf::Messaging

Inherits:
Object
  • Object
show all
Defined in:
lib/avro_turf/messaging.rb

Overview

Provides a way to encode and decode messages without having to embed schemas in the encoded data. Confluent’s Schema Registry is used to register a schema when encoding a message – the registry will issue a schema id that will be included in the encoded data alongside the actual message. When decoding the data, the schema id will be used to look up the writer’s schema from the registry.

1: github.com/confluentinc/schema-registry

Defined Under Namespace

Classes: DecodedMessage

Constant Summary collapse

MAGIC_BYTE =
[0].pack("C").freeze

Instance Method Summary collapse

Constructor Details

#initialize(registry: nil, registry_url: nil, schema_context: nil, schema_store: nil, schemas_path: nil, namespace: nil, registry_path_prefix: nil, logger: nil, proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_chain: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_chain_data: nil, client_key_data: nil, connect_timeout: nil, resolv_resolver: nil, retry_limit: nil) ⇒ Messaging

Instantiate a new Messaging instance with the given configuration.

registry - A schema registry object that responds to all methods in the

AvroTurf::ConfluentSchemaRegistry interface.

registry_url - The String URL of the schema registry that should be used. schema_context - Schema registry context name (optional) schema_store - A schema store object that responds to #find(schema_name, namespace). schemas_path - The String file system path where local schemas are stored. namespace - The String default schema namespace. registry_path_prefix - The String URL path prefix used to namespace schema registry requests (optional). logger - The Logger that should be used to log information (optional). proxy - Forward the request via proxy (optional). user - User for basic auth (optional). password - Password for basic auth (optional). ssl_ca_file - Name of file containing CA certificate (optional). client_cert - Name of file containing client certificate (optional). client_key - Name of file containing client private key to go with client_cert (optional). client_key_pass - Password to go with client_key (optional). client_cert_data - In-memory client certificate (optional). client_chain - Name of file containing client certificate chain (optional). client_key_data - In-memory client private key to go with client_cert_data (optional). client_chain_data - In-memory client certificate chain (optional). connect_timeout - Timeout to use in the connection with the schema registry (optional). resolv_resolver - Custom domain name resolver (optional).



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/avro_turf/messaging.rb', line 66

def initialize(
  registry: nil,
  registry_url: nil,
  schema_context: nil,
  schema_store: nil,
  schemas_path: nil,
  namespace: nil,
  registry_path_prefix: nil,
  logger: nil,
  proxy: nil,
  user: nil,
  password: nil,
  ssl_ca_file: nil,
  client_cert: nil,
  client_chain: nil,
  client_key: nil,
  client_key_pass: nil,
  client_cert_data: nil,
  client_chain_data: nil,
  client_key_data: nil,
  connect_timeout: nil,
  resolv_resolver: nil,
  retry_limit: nil
)
  @logger = logger || Logger.new($stderr)
  @namespace = namespace
  @schema_store = schema_store || SchemaStore.new(path: schemas_path || DEFAULT_SCHEMAS_PATH)
  @registry = registry || CachedConfluentSchemaRegistry.new(
    ConfluentSchemaRegistry.new(
      registry_url,
      schema_context: schema_context,
      logger: @logger,
      proxy: proxy,
      user: user,
      password: password,
      ssl_ca_file: ssl_ca_file,
      client_cert: client_cert,
      client_chain: client_chain,
      client_key: client_key,
      client_key_pass: client_key_pass,
      client_cert_data: client_cert_data,
      client_chain_data: client_chain_data,
      client_key_data: client_key_data,
      path_prefix: registry_path_prefix,
      connect_timeout: connect_timeout,
      resolv_resolver: resolv_resolver,
      retry_limit: retry_limit
    )
  )
  @schemas_by_id = {}
end

Instance Method Details

#decode(data, schema_name: nil, namespace: @namespace) ⇒ Object

Decodes data into the original message.

data - A String containing encoded data. schema_name - The String name of the schema that should be used to decode

the data. Must match the schema used when encoding (optional).

namespace - The namespace of the schema (optional).

Returns the decoded message.



187
188
189
# File 'lib/avro_turf/messaging.rb', line 187

def decode(data, schema_name: nil, namespace: @namespace)
  decode_message(data, schema_name: schema_name, namespace: namespace).message
end

#decode_message(data, schema_name: nil, namespace: @namespace) ⇒ Object

Decodes data into the original message.

data - A String containing encoded data. schema_name - The String name of the schema that should be used to decode

the data. Must match the schema used when encoding (optional).

namespace - The namespace of the schema (optional).

Returns Struct with the next attributes:

schema_id  - The integer id of schema used to encode the message
message    - The decoded message


201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/avro_turf/messaging.rb', line 201

def decode_message(data, schema_name: nil, namespace: @namespace)
  readers_schema = schema_name && @schema_store.find(schema_name, namespace)
  stream = StringIO.new(data)
  decoder = Avro::IO::BinaryDecoder.new(stream)

  # The first byte is MAGIC!!!
  magic_byte = decoder.read(1)

  if magic_byte != MAGIC_BYTE
    raise "Expected data to begin with a magic byte, got `#{magic_byte.inspect}`"
  end

  # The schema id is a 4-byte big-endian integer.
  schema_id = decoder.read(4).unpack1("N")

  writers_schema = @schemas_by_id.fetch(schema_id) do
    schema_json = @registry.fetch(schema_id)
    @schemas_by_id[schema_id] = Avro::Schema.parse(schema_json)
  end

  reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
  message = reader.read(decoder)

  DecodedMessage.new(schema_id, writers_schema, readers_schema, message)
rescue Excon::Error::NotFound
  raise SchemaNotFoundError.new("Schema with id: #{schema_id} is not found on registry")
end

#encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil, validate: false, register_schemas: true) ⇒ Object

Encodes a message using the specified schema.

message - The message that should be encoded. Must be compatible with

the schema.

schema_name - The String name of the schema that should be used to encode

the data.

namespace - The namespace of the schema (optional). subject - The subject name the schema should be registered under in

the schema registry (optional).

version - The integer version of the schema that should be used to decode

the data. Must match the schema used when encoding (optional).

schema_id - The integer id of the schema that should be used to encode

the data.

validate - The boolean for performing complete message validation before

encoding it, Avro::SchemaValidator::ValidationError with
a descriptive message will be raised in case of invalid message.

register_schemas - The boolean that indicates whether or not the schema should be

registered in case it does not exist, or if it should be fetched
from the registry without registering it (register_schemas: false).

Returns the encoded data as a String.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/avro_turf/messaging.rb', line 139

def encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil, validate: false,
  register_schemas: true)
  schema, schema_id = if schema_id
    fetch_schema_by_id(schema_id)
  elsif subject && version
    fetch_schema(subject: subject, version: version)
  elsif schema_name && !register_schemas
    fetch_schema_by_body(subject: subject, schema_name: schema_name, namespace: namespace)
  elsif schema_name
    register_schema(subject: subject, schema_name: schema_name, namespace: namespace)
  else
    raise ArgumentError.new("Neither schema_name nor schema_id nor subject + version provided to determine the schema.")
  end

  if validate
    Avro::SchemaValidator.validate!(schema, message, recursive: true, encoded: false, fail_on_extra_fields: true)
  end

  stream = StringIO.new
  writer = Avro::IO::DatumWriter.new(schema)
  encoder = Avro::IO::BinaryEncoder.new(stream)

  # Always start with the magic byte.
  encoder.write(MAGIC_BYTE)

  # The schema id is encoded as a 4-byte big-endian integer.
  encoder.write([schema_id].pack("N"))

  # The actual message comes last.
  writer.write(message, encoder)

  stream.string
rescue Excon::Error::NotFound
  if schema_id
    raise SchemaNotFoundError.new("Schema with id: #{schema_id} is not found on registry")
  else
    raise SchemaNotFoundError.new("Schema with subject: `#{subject}` version: `#{version}` is not found on registry")
  end
end

#fetch_schema(subject:, version: "latest") ⇒ Object

Providing subject and version to determine the schema, which skips the auto registration of schema on the schema registry. Fetch the schema from registry with the provided subject name and version.



232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/avro_turf/messaging.rb', line 232

def fetch_schema(subject:, version: "latest")
  schema_data = @registry.subject_version(subject, version)
  schema_id = schema_data.fetch("id")
  schema_type = schema_data["schemaType"]
  if schema_type && schema_type != "AVRO"
    raise IncompatibleSchemaError, "The #{schema_type} schema for #{subject} is incompatible."
  end

  schema = @schemas_by_id[schema_id] ||= Avro::Schema.parse(schema_data.fetch("schema"))

  [schema, schema_id]
end

#fetch_schema_by_body(schema_name:, subject: nil, namespace: nil) ⇒ Object



254
255
256
257
258
259
260
# File 'lib/avro_turf/messaging.rb', line 254

def fetch_schema_by_body(schema_name:, subject: nil, namespace: nil)
  schema = @schema_store.find(schema_name, namespace)
  schema_data = @registry.check(subject || schema.fullname, schema)
  raise SchemaNotFoundError.new("Schema with structure: #{schema} not found on registry") unless schema_data

  [schema, schema_data["id"]]
end

#fetch_schema_by_id(schema_id) ⇒ Object

Fetch the schema from registry with the provided schema_id.



246
247
248
249
250
251
252
# File 'lib/avro_turf/messaging.rb', line 246

def fetch_schema_by_id(schema_id)
  schema = @schemas_by_id[schema_id] ||= begin
    schema_json = @registry.fetch(schema_id)
    Avro::Schema.parse(schema_json)
  end
  [schema, schema_id]
end

#register_schema(schema_name:, subject: nil, namespace: nil) ⇒ Object

Schemas are registered under the full name of the top level Avro record type, or ‘subject` if it’s provided.



264
265
266
267
268
# File 'lib/avro_turf/messaging.rb', line 264

def register_schema(schema_name:, subject: nil, namespace: nil)
  schema = @schema_store.find(schema_name, namespace)
  schema_id = @registry.register(subject || schema.fullname, schema)
  [schema, schema_id]
end