Class: ProtoTurf
- Inherits:
-
Object
- Object
- ProtoTurf
- Defined in:
- lib/proto_turf.rb,
lib/proto_turf/version.rb,
lib/proto_turf/confluent_schema_registry.rb
Defined Under Namespace
Classes: CachedConfluentSchemaRegistry, ConfluentSchemaRegistry
Constant Summary collapse
- MAGIC_BYTE =
Provides a way to encode and decode messages without having to embed schemas in the encoded data. Confluent’s Schema Registry is used to register a schema when encoding a message – the registry will issue a schema id that will be included in the encoded data alongside the actual message. When decoding the data, the schema id will be used to look up the writer’s schema from the registry.
1: github.com/confluentinc/schema-registry docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
[0].pack("C").freeze
- VERSION =
"0.0.2"
Instance Method Summary collapse
-
#decode(data) ⇒ Object
Decodes data into the original message.
-
#encode(message, subject: nil) ⇒ Object
Encodes a message using the specified schema.
-
#initialize(registry: nil, registry_url: nil, schema_context: nil, schemas_path: nil, registry_path_prefix: nil, logger: nil, proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, connect_timeout: nil, resolv_resolver: nil, retry_limit: nil) ⇒ ProtoTurf
constructor
Instantiate a new ProtoTurf instance with the given configuration.
Constructor Details
#initialize(registry: nil, registry_url: nil, schema_context: nil, schemas_path: nil, registry_path_prefix: nil, logger: nil, proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, connect_timeout: nil, resolv_resolver: nil, retry_limit: nil) ⇒ ProtoTurf
Instantiate a new ProtoTurf instance with the given configuration.
registry - A schema registry object that responds to all methods in the
ProtoTurf::ConfluentSchemaRegistry interface.
registry_url - The String URL of the schema registry that should be used. schema_context - Schema registry context name (optional) schemas_path - The String file system path where local schemas are stored. registry_path_prefix - The String URL path prefix used to namespace schema registry requests (optional). logger - The Logger that should be used to log information (optional). proxy - Forward the request via proxy (optional). user - User for basic auth (optional). password - Password for basic auth (optional). ssl_ca_file - Name of file containing CA certificate (optional). client_cert - Name of file containing client certificate (optional). client_key - Name of file containing client private key to go with client_cert (optional). client_key_pass - Password to go with client_key (optional). client_cert_data - In-memory client certificate (optional). client_key_data - In-memory client private key to go with client_cert_data (optional). connect_timeout - Timeout to use in the connection with the schema registry (optional). resolv_resolver - Custom domain name resolver (optional).
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/proto_turf.rb', line 39 def initialize( registry: nil, registry_url: nil, schema_context: nil, schemas_path: nil, registry_path_prefix: nil, logger: nil, proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, connect_timeout: nil, resolv_resolver: nil, retry_limit: nil ) @logger = logger || Logger.new($stderr) @path = schemas_path @registry = registry || ProtoTurf::CachedConfluentSchemaRegistry.new( ProtoTurf::ConfluentSchemaRegistry.new( registry_url, schema_context: schema_context, logger: @logger, proxy: proxy, user: user, password: password, ssl_ca_file: ssl_ca_file, client_cert: client_cert, client_key: client_key, client_key_pass: client_key_pass, client_cert_data: client_cert_data, client_key_data: client_key_data, path_prefix: registry_path_prefix, connect_timeout: connect_timeout, resolv_resolver: resolv_resolver ) ) @all_schemas = {} end |
Instance Method Details
#decode(data) ⇒ Object
Decodes data into the original message.
data - A String containing encoded data.
Returns a Protobuf AbstractMessage object instantiated with the decoded data.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/proto_turf.rb', line 130 def decode(data) stream = StringIO.new(data) # The first byte is MAGIC!!! magic_byte = stream.read(1) if magic_byte != MAGIC_BYTE raise "Expected data to begin with a magic byte, got `#{magic_byte.inspect}`" end # The schema id is a 4-byte big-endian integer. schema_id = stream.read(4).unpack("N").first # For now, we're only going to support a single message per schema. See # https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format index_length = read_int(stream) indexes = [] if index_length.zero? indexes.push(0) else index_length.times do indexes.push(read_int(stream)) end end schema = @registry.fetch(schema_id) encoded = stream.read decode_protobuf(schema, encoded, indexes) rescue Excon::Error::NotFound raise SchemaNotFoundError.new("Schema with id: #{schema_id} is not found on registry") end |
#encode(message, subject: nil) ⇒ Object
Encodes a message using the specified schema.
message - The message that should be encoded. Must be compatible with
the schema.
subject - The subject name the schema should be registered under in
the schema registry (optional).
Returns the encoded data as a String.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/proto_turf.rb', line 90 def encode(, subject: nil) load_schemas! if @all_schemas.empty? file_descriptor = .class.descriptor.file_descriptor id = register_schema(file_descriptor, subject: subject) stream = StringIO.new # Always start with the magic byte. stream.write(MAGIC_BYTE) # The schema id is encoded as a 4-byte big-endian integer. stream.write([id].pack("N")) _, indexes = find_index(.class.descriptor.to_proto, file_descriptor.to_proto.) if indexes == [0] write_int(stream, 0) else write_int(stream, indexes.length) indexes.each { |i| write_int(stream, i) } end # Now we write the actual message. stream.write(.to_proto) stream.string rescue Excon::Error::NotFound if schema_id raise SchemaNotFoundError.new("Schema with id: #{schema_id} is not found on registry") else raise SchemaNotFoundError.new("Schema with subject: `#{subject}` version: `#{version}` is not found on registry") end end |