Class: SchemaVersionCache

Inherits:
Object
  • Object
show all
Defined in:
lib/schema_version_cache.rb

Defined Under Namespace

Classes: Entry

Constant Summary collapse

SchemaNotFound =
Class.new(StandardError)
SubjectLookupError =
Class.new(StandardError)

Instance Method Summary collapse

Constructor Details

#initialize(registry) ⇒ SchemaVersionCache

Registry is expected to provide the following methods:

  • subject_versions: Given a subject, return an array of version numbers.

  • subject_version: Given a subject and version number, return a hash of schema data including schema ID under key “id” and schema JSON string under key “schema”.

In practice, we use AvroTurf::ConfluentSchemaRegistry, but for flexibility and ease of testing, any object providing the necessary methods will work.



17
18
19
20
21
# File 'lib/schema_version_cache.rb', line 17

def initialize(registry)
  @registry = registry
  @by_version = {}
  @by_id = {}
end

Instance Method Details

#find_compatible_version(subject:, data:, schema_parser: nil, validator: nil) ⇒ Object



72
73
74
75
76
77
78
79
# File 'lib/schema_version_cache.rb', line 72

def find_compatible_version(subject:, data:, schema_parser: nil, validator: nil)
  version = newest_compatible_version(subject:, data:, schema_parser:, validator:)
  return version if version

  add_subject_to_cache(subject)
  newest_compatible_version(subject:, data:, schema_parser:, validator:) ||
    schema_not_found(subject:)
end

#get_current_id(subject:) ⇒ Object



45
46
47
48
49
50
51
52
# File 'lib/schema_version_cache.rb', line 45

def get_current_id(subject:)
  if @by_id.key?(subject) && @by_id.fetch(subject).keys.any?
    return @by_id.fetch(subject).keys.max
  end

  add_subject_to_cache(subject)
  @by_id.fetch(subject, {}).keys.max || schema_not_found(subject:)
end

#get_schema_id(subject:, version:) ⇒ Object



54
55
56
57
58
59
60
61
# File 'lib/schema_version_cache.rb', line 54

def get_schema_id(subject:, version:)
  if @by_version.key?(subject) && @by_version.fetch(subject).key?(version)
    return @by_version.fetch(subject).fetch(version).id
  end

  add_subject_to_cache(subject)
  @by_version.dig(subject, version)&.id || schema_not_found(subject:, version:)
end

#get_schema_json(subject:, version:) ⇒ Object



63
64
65
66
67
68
69
70
# File 'lib/schema_version_cache.rb', line 63

def get_schema_json(subject:, version:)
  if @by_version.key?(subject) && @by_version.fetch(subject).key?(version)
    return @by_version.fetch(subject).fetch(version).schema
  end

  add_subject_to_cache(subject)
  @by_version.dig(subject, version)&.schema || schema_not_found(subject:, version:)
end

#get_version_number(subject:, schema_id:) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/schema_version_cache.rb', line 36

def get_version_number(subject:, schema_id:)
  if @by_id.key?(subject) && @by_id.fetch(subject).key?(schema_id)
    return @by_id.fetch(subject).fetch(schema_id).version
  end

  add_subject_to_cache(subject)
  @by_id.dig(subject, schema_id)&.version || schema_not_found(subject:, schema_id:)
end

#get_version_numbers(subject:) ⇒ Object



27
28
29
30
31
32
33
34
# File 'lib/schema_version_cache.rb', line 27

def get_version_numbers(subject:)
  if @by_version.key?(subject) && @by_version.fetch(subject).keys.any?
    return @by_version.fetch(subject).keys
  end

  add_subject_to_cache(subject)
  @by_version.fetch(subject, {}).keys
end

#preload(subjects) ⇒ Object



23
24
25
# File 'lib/schema_version_cache.rb', line 23

def preload(subjects)
  subjects.each { |subject| add_subject_to_cache(subject) }
end