Class: AvroTurf::ConfluentSchemaRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/avro_turf/confluent_schema_registry.rb

Constant Summary collapse

CONTENT_TYPE =
"application/vnd.schemaregistry.v1+json".freeze

Instance Method Summary collapse

Constructor Details

#initialize(url, logger: Logger.new($stdout), proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, path_prefix: nil) ⇒ ConfluentSchemaRegistry

Returns a new instance of ConfluentSchemaRegistry.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/avro_turf/confluent_schema_registry.rb', line 6

def initialize(
  url,
  logger: Logger.new($stdout),
  proxy: nil,
  user: nil,
  password: nil,
  ssl_ca_file: nil,
  client_cert: nil,
  client_key: nil,
  client_key_pass: nil,
  client_cert_data: nil,
  client_key_data: nil,
  path_prefix: nil
)
  @path_prefix = path_prefix
  @logger = logger
  headers = Excon.defaults[:headers].merge(
    "Content-Type" => CONTENT_TYPE
  )
  headers[:proxy] = proxy unless proxy.nil?
  @connection = Excon.new(
    url,
    headers: headers,
    user: user,
    password: password,
    ssl_ca_file: ssl_ca_file,
    client_cert: client_cert,
    client_key: client_key,
    client_key_pass: client_key_pass,
    client_cert_data: client_cert_data,
    client_key_data: client_key_data
  )
end

Instance Method Details

#check(subject, schema) ⇒ Object

Check if a schema exists. Returns nil if not found.



77
78
79
80
81
82
# File 'lib/avro_turf/confluent_schema_registry.rb', line 77

def check(subject, schema)
  data = post("/subjects/#{subject}",
              expects: [200, 404],
              body: { schema: schema.to_s }.to_json)
  data unless data.has_key?("error_code")
end

#compatible?(subject, schema, version = 'latest') ⇒ Boolean

Check if a schema is compatible with the stored version. Returns:

  • true if compatible

  • nil if the subject or version does not exist

  • false if incompatible

docs.confluent.io/3.1.2/schema-registry/docs/api.html#compatibility

Returns:

  • (Boolean)


90
91
92
93
94
# File 'lib/avro_turf/confluent_schema_registry.rb', line 90

def compatible?(subject, schema, version = 'latest')
  data = post("/compatibility/subjects/#{subject}/versions/#{version}",
              expects: [200, 404], body: { schema: schema.to_s }.to_json)
  data.fetch('is_compatible', false) unless data.has_key?('error_code')
end

#fetch(id) ⇒ Object



40
41
42
43
44
# File 'lib/avro_turf/confluent_schema_registry.rb', line 40

def fetch(id)
  @logger.info "Fetching schema with id #{id}"
  data = get("/schemas/ids/#{id}")
  data.fetch("schema")
end

#global_configObject

Get global config



97
98
99
# File 'lib/avro_turf/confluent_schema_registry.rb', line 97

def global_config
  get("/config")
end

#register(subject, schema) ⇒ Object



46
47
48
49
50
51
52
53
54
# File 'lib/avro_turf/confluent_schema_registry.rb', line 46

def register(subject, schema)
  data = post("/subjects/#{subject}/versions", body: { schema: schema.to_s }.to_json)

  id = data.fetch("id")

  @logger.info "Registered schema for subject `#{subject}`; id = #{id}"

  id
end

#schema_subject_versions(schema_id) ⇒ Object

Get the subject and version for a schema id



72
73
74
# File 'lib/avro_turf/confluent_schema_registry.rb', line 72

def schema_subject_versions(schema_id)
  get("/schemas/ids/#{schema_id}/versions")
end

#subject_config(subject) ⇒ Object

Get config for subject



107
108
109
# File 'lib/avro_turf/confluent_schema_registry.rb', line 107

def subject_config(subject)
  get("/config/#{subject}")
end

#subject_version(subject, version = 'latest') ⇒ Object

Get a specific version for a subject



67
68
69
# File 'lib/avro_turf/confluent_schema_registry.rb', line 67

def subject_version(subject, version = 'latest')
  get("/subjects/#{subject}/versions/#{version}")
end

#subject_versions(subject) ⇒ Object

List all versions for a subject



62
63
64
# File 'lib/avro_turf/confluent_schema_registry.rb', line 62

def subject_versions(subject)
  get("/subjects/#{subject}/versions")
end

#subjectsObject

List all subjects



57
58
59
# File 'lib/avro_turf/confluent_schema_registry.rb', line 57

def subjects
  get('/subjects')
end

#update_global_config(config) ⇒ Object

Update global config



102
103
104
# File 'lib/avro_turf/confluent_schema_registry.rb', line 102

def update_global_config(config)
  put("/config", body: config.to_json)
end

#update_subject_config(subject, config) ⇒ Object

Update config for subject



112
113
114
# File 'lib/avro_turf/confluent_schema_registry.rb', line 112

def update_subject_config(subject, config)
  put("/config/#{subject}", body: config.to_json)
end