Class: ProtoTurf

Inherits:
Object
  • Object
show all
Defined in:
lib/proto_turf.rb,
lib/proto_turf/version.rb,
lib/proto_turf/confluent_schema_registry.rb

Defined Under Namespace

Classes: CachedConfluentSchemaRegistry, ConfluentSchemaRegistry

Constant Summary collapse

MAGIC_BYTE =

Provides a way to encode and decode messages without having to embed schemas in the encoded data. Confluent’s Schema Registry is used to register a schema when encoding a message – the registry will issue a schema id that will be included in the encoded data alongside the actual message. When decoding the data, the schema id will be used to look up the writer’s schema from the registry.

1: github.com/confluentinc/schema-registry docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format

[0].pack("C").freeze
VERSION =
"0.0.2"

Instance Method Summary collapse

Constructor Details

#initialize(registry: nil, registry_url: nil, schema_context: nil, schemas_path: nil, registry_path_prefix: nil, logger: nil, proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, connect_timeout: nil, resolv_resolver: nil, retry_limit: nil) ⇒ ProtoTurf

Instantiate a new ProtoTurf instance with the given configuration.

registry - A schema registry object that responds to all methods in the

ProtoTurf::ConfluentSchemaRegistry interface.

registry_url - The String URL of the schema registry that should be used. schema_context - Schema registry context name (optional) schemas_path - The String file system path where local schemas are stored. registry_path_prefix - The String URL path prefix used to namespace schema registry requests (optional). logger - The Logger that should be used to log information (optional). proxy - Forward the request via proxy (optional). user - User for basic auth (optional). password - Password for basic auth (optional). ssl_ca_file - Name of file containing CA certificate (optional). client_cert - Name of file containing client certificate (optional). client_key - Name of file containing client private key to go with client_cert (optional). client_key_pass - Password to go with client_key (optional). client_cert_data - In-memory client certificate (optional). client_key_data - In-memory client private key to go with client_cert_data (optional). connect_timeout - Timeout to use in the connection with the schema registry (optional). resolv_resolver - Custom domain name resolver (optional).



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/proto_turf.rb', line 39

def initialize(
  registry: nil,
  registry_url: nil,
  schema_context: nil,
  schemas_path: nil,
  registry_path_prefix: nil,
  logger: nil,
  proxy: nil,
  user: nil,
  password: nil,
  ssl_ca_file: nil,
  client_cert: nil,
  client_key: nil,
  client_key_pass: nil,
  client_cert_data: nil,
  client_key_data: nil,
  connect_timeout: nil,
  resolv_resolver: nil,
  retry_limit: nil
)
  @logger = logger || Logger.new($stderr)
  @path = schemas_path
  @registry = registry || ProtoTurf::CachedConfluentSchemaRegistry.new(
    ProtoTurf::ConfluentSchemaRegistry.new(
      registry_url,
      schema_context: schema_context,
      logger: @logger,
      proxy: proxy,
      user: user,
      password: password,
      ssl_ca_file: ssl_ca_file,
      client_cert: client_cert,
      client_key: client_key,
      client_key_pass: client_key_pass,
      client_cert_data: client_cert_data,
      client_key_data: client_key_data,
      path_prefix: registry_path_prefix,
      connect_timeout: connect_timeout,
      resolv_resolver: resolv_resolver
    )
  )
  @all_schemas = {}
end

Instance Method Details

#decode(data) ⇒ Object

Decodes data into the original message.

data - A String containing encoded data.

Returns a Protobuf AbstractMessage object instantiated with the decoded data.



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/proto_turf.rb', line 130

def decode(data)
  stream = StringIO.new(data)

  # The first byte is MAGIC!!!
  magic_byte = stream.read(1)

  if magic_byte != MAGIC_BYTE
    raise "Expected data to begin with a magic byte, got `#{magic_byte.inspect}`"
  end

  # The schema id is a 4-byte big-endian integer.
  schema_id = stream.read(4).unpack("N").first

  # For now, we're only going to support a single message per schema. See
  # https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
  index_length = read_int(stream)
  indexes = []
  if index_length.zero?
    indexes.push(0)
  else
    index_length.times do
      indexes.push(read_int(stream))
    end
  end

  schema = @registry.fetch(schema_id)
  encoded = stream.read
  decode_protobuf(schema, encoded, indexes)
rescue Excon::Error::NotFound
  raise SchemaNotFoundError.new("Schema with id: #{schema_id} is not found on registry")
end

#encode(message, subject: nil) ⇒ Object

Encodes a message using the specified schema.

message - The message that should be encoded. Must be compatible with

the schema.

subject - The subject name the schema should be registered under in

the schema registry (optional).

Returns the encoded data as a String.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/proto_turf.rb', line 90

def encode(message, subject: nil)
  load_schemas! if @all_schemas.empty?

  file_descriptor = message.class.descriptor.file_descriptor
  id = register_schema(file_descriptor, subject: subject)

  stream = StringIO.new
  # Always start with the magic byte.
  stream.write(MAGIC_BYTE)

  # The schema id is encoded as a 4-byte big-endian integer.
  stream.write([id].pack("N"))

  _, indexes = find_index(message.class.descriptor.to_proto,
                          file_descriptor.to_proto.message_type)

  if indexes == [0]
    write_int(stream, 0)
  else
    write_int(stream, indexes.length)
    indexes.each { |i| write_int(stream, i) }
  end

  # Now we write the actual message.
  stream.write(message.to_proto)

  stream.string
rescue Excon::Error::NotFound
  if schema_id
    raise SchemaNotFoundError.new("Schema with id: #{schema_id} is not found on registry")
  else
    raise SchemaNotFoundError.new("Schema with subject: `#{subject}` version: `#{version}` is not found on registry")
  end
end