Class: Deimos::SchemaBackends::ProtoSchemaRegistry

Inherits:
ProtoBase
  • Object
show all
Defined in:
lib/deimos/schema_backends/proto_schema_registry.rb

Overview

Encode / decode using the Protobuf schema registry.

Constant Summary

Constants inherited from ProtoBase

Deimos::SchemaBackends::ProtoBase::SQL_MAP

Instance Attribute Summary

Attributes inherited from Base

#key_schema, #namespace, #registry_info, #schema

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from ProtoBase

#coerce, #coerce_field, #decode_key, #encode_key, #proto_schema, #schema_fields, #sql_type, #supports_key_schemas?, #validate

Methods inherited from Base

#coerce, #coerce_field, content_type, #decode, #decode_key, #encode, #encode_key, field_type, #generate_key_schema, #initialize, #inspect, #load_schema, #schema_fields, #sql_type, #supports_class_generation?, #supports_key_schemas?, #validate

Constructor Details

This class inherits a constructor from Deimos::SchemaBackends::Base

Class Method Details

.mock_backendObject



12
13
14
# File 'lib/deimos/schema_backends/proto_schema_registry.rb', line 12

def self.mock_backend
  :proto_local
end

Instance Method Details

#decode_payload(payload, schema:) ⇒ Object



17
18
19
# File 'lib/deimos/schema_backends/proto_schema_registry.rb', line 17

def decode_payload(payload, schema:)
  self.schema_registry.decode(payload)
end

#decode_proto_key(payload) ⇒ Object



35
36
37
# File 'lib/deimos/schema_backends/proto_schema_registry.rb', line 35

def decode_proto_key(payload)
  self.key_schema_registry.decode(payload)
end

#encode_payload(payload, schema: nil, subject: nil) ⇒ Object



22
23
24
25
26
# File 'lib/deimos/schema_backends/proto_schema_registry.rb', line 22

def encode_payload(payload, schema: nil, subject: nil)
  msg = payload.is_a?(Hash) ? proto_schema.msgclass.new(**payload) : payload
  encoder = subject&.ends_with?('-key') ? self.key_schema_registry : self.schema_registry
  encoder.encode(msg, subject: subject)
end

#encode_proto_key(key, topic: nil, field: nil) ⇒ Object



29
30
31
32
# File 'lib/deimos/schema_backends/proto_schema_registry.rb', line 29

def encode_proto_key(key, topic: nil, field: nil)
  schema_text = SchemaRegistry::Output::JsonSchema.output(proto_schema.to_proto, path: field)
  self.key_schema_registry.encode(key, subject: "#{topic}-key", schema_text: schema_text)
end

#key_schema_registryObject



49
50
51
52
53
54
55
56
57
# File 'lib/deimos/schema_backends/proto_schema_registry.rb', line 49

def key_schema_registry
  @key_schema_registry ||= SchemaRegistry::Client.new(
    registry_url: @registry_info&.url || Deimos.config.schema.registry_url,
    user: @registry_info&.user || Deimos.config.schema.user,
    password: @registry_info&.password || Deimos.config.schema.password,
    logger: Karafka.logger,
    schema_type: SchemaRegistry::Schema::ProtoJsonSchema.new
  )
end

#schema_registrySchemaRegistry::Client

Returns:

  • (SchemaRegistry::Client)


40
41
42
43
44
45
46
47
# File 'lib/deimos/schema_backends/proto_schema_registry.rb', line 40

def schema_registry
  @schema_registry ||= SchemaRegistry::Client.new(
    registry_url: @registry_info&.url || Deimos.config.schema.registry_url,
    user: @registry_info&.user || Deimos.config.schema.user,
    password: @registry_info&.password || Deimos.config.schema.password,
    logger: Karafka.logger
  )
end

#write_key_proto(file, field_name) ⇒ Object

Parameters:

  • file (String)
  • field_name (String)


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/deimos/schema_backends/proto_schema_registry.rb', line 61

def write_key_proto(file, field_name)
  return if field_name.nil?

  proto = proto_schema
  package = proto.file_descriptor.to_proto.package
  writer = SchemaRegistry::Output::ProtoText::Writer.new
  info = SchemaRegistry::Output::ProtoText::ParseInfo.new(writer, package)
  writer.write_line(%(syntax = "proto3";))
  writer.write_line("package #{package};")
  writer.writenl

  field = proto.to_proto.field.find { |f| f.name == field_name.to_s }
  writer.write_line("message #{proto.to_proto.name}Key {")
  writer.indent
  SchemaRegistry::Output::ProtoText.write_field(info, field)
  writer.dedent
  writer.write_line('}')
  path = "#{file}/#{package.gsub('.', '/')}/#{proto.to_proto.name.underscore}_key.proto"
  File.write(path, writer.string)
end