Class: SchemaVersionCache
- Inherits:
-
Object
- Object
- SchemaVersionCache
- Defined in:
- lib/schema_version_cache.rb
Defined Under Namespace
Classes: Entry
Constant Summary collapse
- SchemaNotFound =
Class.new(StandardError)
- SubjectLookupError =
Class.new(StandardError)
Instance Method Summary collapse
- #find_compatible_version(subject:, data:, schema_parser: nil, validator: nil) ⇒ Object
- #get_current_id(subject:) ⇒ Object
- #get_schema_id(subject:, version:) ⇒ Object
- #get_schema_json(subject:, version:) ⇒ Object
- #get_version_number(subject:, schema_id:) ⇒ Object
- #get_version_numbers(subject:) ⇒ Object
-
#initialize(registry) ⇒ SchemaVersionCache
constructor
Registry is expected to provide the following methods: - subject_versions: Given a subject, return an array of version numbers.
- #preload(subjects) ⇒ Object
Constructor Details
#initialize(registry) ⇒ SchemaVersionCache
Registry is expected to provide the following methods:
-
subject_versions: Given a subject, return an array of version numbers.
-
subject_version: Given a subject and version number, return a hash of schema data including schema ID under key “id” and schema JSON string under key “schema”.
In practice, we use AvroTurf::ConfluentSchemaRegistry, but for flexibility and ease of testing, any object providing the necessary methods will work.
17 18 19 20 21 |
# File 'lib/schema_version_cache.rb', line 17 def initialize(registry) @registry = registry @by_version = {} @by_id = {} end |
Instance Method Details
#find_compatible_version(subject:, data:, schema_parser: nil, validator: nil) ⇒ Object
72 73 74 75 76 77 78 79 |
# File 'lib/schema_version_cache.rb', line 72 def find_compatible_version(subject:, data:, schema_parser: nil, validator: nil) version = newest_compatible_version(subject:, data:, schema_parser:, validator:) return version if version add_subject_to_cache(subject) newest_compatible_version(subject:, data:, schema_parser:, validator:) || schema_not_found(subject:) end |
#get_current_id(subject:) ⇒ Object
45 46 47 48 49 50 51 52 |
# File 'lib/schema_version_cache.rb', line 45 def get_current_id(subject:) if @by_id.key?(subject) && @by_id.fetch(subject).keys.any? return @by_id.fetch(subject).keys.max end add_subject_to_cache(subject) @by_id.fetch(subject, {}).keys.max || schema_not_found(subject:) end |
#get_schema_id(subject:, version:) ⇒ Object
54 55 56 57 58 59 60 61 |
# File 'lib/schema_version_cache.rb', line 54 def get_schema_id(subject:, version:) if @by_version.key?(subject) && @by_version.fetch(subject).key?(version) return @by_version.fetch(subject).fetch(version).id end add_subject_to_cache(subject) @by_version.dig(subject, version)&.id || schema_not_found(subject:, version:) end |
#get_schema_json(subject:, version:) ⇒ Object
63 64 65 66 67 68 69 70 |
# File 'lib/schema_version_cache.rb', line 63 def get_schema_json(subject:, version:) if @by_version.key?(subject) && @by_version.fetch(subject).key?(version) return @by_version.fetch(subject).fetch(version).schema end add_subject_to_cache(subject) @by_version.dig(subject, version)&.schema || schema_not_found(subject:, version:) end |
#get_version_number(subject:, schema_id:) ⇒ Object
36 37 38 39 40 41 42 43 |
# File 'lib/schema_version_cache.rb', line 36 def get_version_number(subject:, schema_id:) if @by_id.key?(subject) && @by_id.fetch(subject).key?(schema_id) return @by_id.fetch(subject).fetch(schema_id).version end add_subject_to_cache(subject) @by_id.dig(subject, schema_id)&.version || schema_not_found(subject:, schema_id:) end |
#get_version_numbers(subject:) ⇒ Object
27 28 29 30 31 32 33 34 |
# File 'lib/schema_version_cache.rb', line 27 def get_version_numbers(subject:) if @by_version.key?(subject) && @by_version.fetch(subject).keys.any? return @by_version.fetch(subject).keys end add_subject_to_cache(subject) @by_version.fetch(subject, {}).keys end |
#preload(subjects) ⇒ Object
23 24 25 |
# File 'lib/schema_version_cache.rb', line 23 def preload(subjects) subjects.each { |subject| add_subject_to_cache(subject) } end |