Class: SchemaRegistry::Client
- Inherits:
-
Object
- Object
- SchemaRegistry::Client
- Defined in:
- lib/schema_registry_client.rb
Constant Summary collapse
- MAGIC_BYTE =
Provides a way to encode and decode messages without having to embed schemas in the encoded data. Confluent’s Schema Registry is used to register a schema when encoding a message – the registry will issue a schema id that will be included in the encoded data alongside the actual message. When decoding the data, the schema id will be used to look up the writer’s schema from the registry.
1: github.com/confluentinc/schema-registry docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
[0].pack("C").freeze
Instance Method Summary collapse
-
#decode(data) ⇒ Object
Decodes data into the original message.
-
#encode(message, subject: nil, schema_text: nil, schema_name: nil) ⇒ String
Encodes a message using the specified schema.
-
#initialize(registry: nil, registry_url: nil, schema_context: nil, registry_path_prefix: nil, logger: nil, proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, connect_timeout: nil, resolv_resolver: nil, schema_type: SchemaRegistry::Schema::Protobuf) ⇒ Client
constructor
Instantiate a new SchemaRegistry instance with the given configuration.
Constructor Details
#initialize(registry: nil, registry_url: nil, schema_context: nil, registry_path_prefix: nil, logger: nil, proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, connect_timeout: nil, resolv_resolver: nil, schema_type: SchemaRegistry::Schema::Protobuf) ⇒ Client
Instantiate a new SchemaRegistry instance with the given configuration.
registry - A schema registry object that responds to all methods in the
SchemaRegistry::ConfluentSchemaRegistry interface.
registry_url - The String URL of the schema registry that should be used. schema_context - Schema registry context name (optional) registry_path_prefix - The String URL path prefix used to namespace schema registry requests (optional). logger - The Logger that should be used to log information (optional). proxy - Forward the request via proxy (optional). user - User for basic auth (optional). password - Password for basic auth (optional). ssl_ca_file - Name of file containing CA certificate (optional). client_cert - Name of file containing client certificate (optional). client_key - Name of file containing client private key to go with client_cert (optional). client_key_pass - Password to go with client_key (optional). client_cert_data - In-memory client certificate (optional). client_key_data - In-memory client private key to go with client_cert_data (optional). connect_timeout - Timeout to use in the connection with the schema registry (optional). resolv_resolver - Custom domain name resolver (optional). schema_type - A SchemaRegistry::Schema::Base subclass.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/schema_registry_client.rb', line 52 def initialize( # rubocop:disable Metrics/ParameterLists registry: nil, registry_url: nil, schema_context: nil, registry_path_prefix: nil, logger: nil, proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, connect_timeout: nil, resolv_resolver: nil, schema_type: SchemaRegistry::Schema::Protobuf ) @logger = logger || Logger.new($stderr) @registry = registry || SchemaRegistry::CachedConfluentSchemaRegistry.new( SchemaRegistry::ConfluentSchemaRegistry.new( registry_url, schema_context: schema_context, logger: @logger, proxy: proxy, user: user, password: password, ssl_ca_file: ssl_ca_file, client_cert: client_cert, client_key: client_key, client_key_pass: client_key_pass, client_cert_data: client_cert_data, client_key_data: client_key_data, path_prefix: registry_path_prefix, connect_timeout: connect_timeout, resolv_resolver: resolv_resolver ) ) @schema = schema_type end |
Instance Method Details
#decode(data) ⇒ Object
Decodes data into the original message.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/schema_registry_client.rb', line 117 def decode(data) stream = StringIO.new(data) # The first byte is MAGIC!!! magic_byte = stream.read(1) raise "Expected data to begin with a magic byte, got `#{magic_byte.inspect}`" if magic_byte != MAGIC_BYTE # The schema id is a 4-byte big-endian integer. schema_id = stream.read(4).unpack1("N") schema = @registry.fetch(schema_id) @schema.decode(stream, schema) rescue Excon::Error::NotFound raise SchemaNotFoundError, "Schema with id: #{schema_id} is not found on registry" end |
#encode(message, subject: nil, schema_text: nil, schema_name: nil) ⇒ String
Encodes a message using the specified schema.
99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/schema_registry_client.rb', line 99 def encode(, subject: nil, schema_text: nil, schema_name: nil) id = register_schema(, subject, schema_text: schema_text, schema_name: schema_name) stream = StringIO.new # Always start with the magic byte. stream.write(MAGIC_BYTE) # The schema id is encoded as a 4-byte big-endian integer. stream.write([id].pack("N")) @schema.encode(, stream, schema_name: schema_name) stream.string end |