Class: Deimos::SchemaBackends::AvroBase

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/schema_backends/avro_base.rb

Overview

Encode / decode using Avro, either locally or via schema registry.

Direct Known Subclasses

AvroLocal, AvroSchemaRegistry, AvroValidation

Instance Attribute Summary collapse

Attributes inherited from Base

#key_schema, #namespace, #schema

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#coerce, #decode, #decode_payload, #encode, #encode_payload

Constructor Details

#initialize(schema:, namespace:) ⇒ AvroBase

Returns a new instance of AvroBase.



16
17
18
19
# File 'lib/deimos/schema_backends/avro_base.rb', line 16

def initialize(schema:, namespace:)
  super(schema: schema, namespace: namespace)
  @schema_store = AvroTurf::MutableSchemaStore.new(path: Deimos.config.schema.path)
end

Instance Attribute Details

#schema_storeObject

Returns the value of attribute schema_store.



13
14
15
# File 'lib/deimos/schema_backends/avro_base.rb', line 13

def schema_store
  @schema_store
end

Class Method Details

.content_typeObject



86
87
88
# File 'lib/deimos/schema_backends/avro_base.rb', line 86

def self.content_type
  'avro/binary'
end

.mock_backendObject



81
82
83
# File 'lib/deimos/schema_backends/avro_base.rb', line 81

def self.mock_backend
  :avro_validation
end

Instance Method Details

#coerce_field(field, value) ⇒ Object



61
62
63
# File 'lib/deimos/schema_backends/avro_base.rb', line 61

def coerce_field(field, value)
  AvroSchemaCoercer.new(avro_schema).coerce_type(field.type, value)
end

#decode_key(payload, key_id) ⇒ Object



30
31
32
33
34
# File 'lib/deimos/schema_backends/avro_base.rb', line 30

def decode_key(payload, key_id)
  @key_schema ||= _generate_key_schema(key_id)
  field_name = _field_name_from_schema(@key_schema)
  decode(payload, schema: @key_schema['name'])[field_name]
end

#encode_key(key_id, key, topic: nil) ⇒ Object



22
23
24
25
26
27
# File 'lib/deimos/schema_backends/avro_base.rb', line 22

def encode_key(key_id, key, topic: nil)
  @key_schema ||= _generate_key_schema(key_id)
  field_name = _field_name_from_schema(@key_schema)
  payload = { field_name => key }
  encode(payload, schema: @key_schema['name'], topic: topic)
end

#schema_fieldsObject



66
67
68
69
70
71
# File 'lib/deimos/schema_backends/avro_base.rb', line 66

def schema_fields
  avro_schema.fields.map do |field|
    enum_values = field.type.type == 'enum' ? field.type.symbols : []
    SchemaField.new(field.name, field.type, enum_values)
  end
end

#sql_type(field) ⇒ Object

:nodoc:



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/deimos/schema_backends/avro_base.rb', line 37

def sql_type(field)
  type = field.type.type
  return type if %w(array map record).include?(type)

  if type == :union
    non_null = field.type.schemas.reject { |f| f.type == :null }
    if non_null.size > 1
      warn("WARNING: #{field.name} has more than one non-null type. Picking the first for the SQL type.")
    end
    return non_null.first.type
  end
  return type.to_sym if %w(float boolean).include?(type)
  return :integer if type == 'int'
  return :bigint if type == 'long'

  if type == 'double'
    warn('Avro `double` type turns into SQL `float` type. Please ensure you have the correct `limit` set.')
    return :float
  end

  :string
end

#validate(payload, schema:) ⇒ Object



74
75
76
77
78
# File 'lib/deimos/schema_backends/avro_base.rb', line 74

def validate(payload, schema:)
  Avro::SchemaValidator.validate!(avro_schema(schema), payload,
                                  recursive: true,
                                  fail_on_extra_fields: true)
end