Class: ProtoTurf::ConfluentSchemaRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/proto_turf/confluent_schema_registry.rb

Constant Summary collapse

CONTENT_TYPE =
"application/vnd.schemaregistry.v1+json".freeze

Instance Method Summary collapse

Constructor Details

#initialize(url, schema_context: nil, logger: Logger.new($stdout), proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, path_prefix: nil, connect_timeout: nil, resolv_resolver: nil, retry_limit: nil) ⇒ ConfluentSchemaRegistry

Returns a new instance of ConfluentSchemaRegistry.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/proto_turf/confluent_schema_registry.rb', line 8

def initialize(
  url,
  schema_context: nil,
  logger: Logger.new($stdout),
  proxy: nil,
  user: nil,
  password: nil,
  ssl_ca_file: nil,
  client_cert: nil,
  client_key: nil,
  client_key_pass: nil,
  client_cert_data: nil,
  client_key_data: nil,
  path_prefix: nil,
  connect_timeout: nil,
  resolv_resolver: nil,
  retry_limit: nil
)
  @path_prefix = path_prefix
  @schema_context_prefix = schema_context.nil? ? '' : ":.#{schema_context}:"
  @schema_context_options = schema_context.nil? ? {} : {query: {subject: @schema_context_prefix}}
  @logger = logger
  headers = Excon.defaults[:headers].merge(
    "Content-Type" => CONTENT_TYPE
  )
  params = {
    headers: headers,
    user: user,
    password: password,
    proxy: proxy,
    ssl_ca_file: ssl_ca_file,
    client_cert: client_cert,
    client_key: client_key,
    client_key_pass: client_key_pass,
    client_cert_data: client_cert_data,
    client_key_data: client_key_data,
    resolv_resolver: resolv_resolver,
    connect_timeout: connect_timeout,
    retry_limit: retry_limit
  }
  # Remove nil params to allow Excon to use its default values
  params.reject! { |_, v| v.nil? }
  @connection = Excon.new(
    url,
    params
  )
end

Instance Method Details

#fetch(id) ⇒ String

Returns the schema string stored in the registry for the given id.

Parameters:

  • id (Integer)

    the schema ID to fetch

Returns:

  • (String)

    the schema string stored in the registry for the given id



58
59
60
61
62
# File 'lib/proto_turf/confluent_schema_registry.rb', line 58

def fetch(id)
  @logger.info "Fetching schema with id #{id}"
  data = get("/schemas/ids/#{id}", idempotent: true, **@schema_context_options, )
  data.fetch("schema")
end

#register(subject, schema, references: []) ⇒ Integer

Returns the ID of the registered schema.

Parameters:

  • subject (String)

    the subject to check

  • schema (String)

    the schema text to check

  • references (Array<Hash>) (defaults to: [])

    optional references to other schemas

Returns:

  • (Integer)

    the ID of the registered schema



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/proto_turf/confluent_schema_registry.rb', line 74

def register(subject, schema, references: [])
  data = post("/subjects/#{@schema_context_prefix}#{CGI.escapeURIComponent(subject)}/versions",
              body: { schemaType: 'PROTOBUF',
                      references: references,
                      schema: schema.to_s }.to_json)

  id = data.fetch("id")

  @logger.info "Registered schema for subject `#{@schema_context_prefix}#{subject}`; id = #{id}"

  id
end

#schema_subject_versions(schema_id) ⇒ Array<Hash>

Returns an array of versions for the given schema ID.

Parameters:

  • schema_id (Integer)

    the schema ID to fetch versions for

Returns:

  • (Array<Hash>)

    an array of versions for the given schema ID



66
67
68
# File 'lib/proto_turf/confluent_schema_registry.rb', line 66

def schema_subject_versions(schema_id)
  get("/schemas/ids/#{schema_id}/versions", idempotent: true, **@schema_context_options)
end

#subject_versions(subject) ⇒ Array<Hash>

Returns an array of versions for the given subject.

Parameters:

  • subject (String)

Returns:

  • (Array<Hash>)

    an array of versions for the given subject



89
90
91
# File 'lib/proto_turf/confluent_schema_registry.rb', line 89

def subject_versions(subject)
  get("/subjects/#{@schema_context_prefix}#{CGI.escapeURIComponent(subject)}/versions", idempotent: true)
end