Class: Deimos::SchemaBackends::AvroBase

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/schema_backends/avro_base.rb

Overview

Encode / decode using Avro, either locally or via schema registry.

Direct Known Subclasses

AvroLocal, AvroSchemaRegistry, AvroValidation

Instance Attribute Summary collapse

Attributes inherited from Base

#key_schema, #namespace, #schema

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#coerce, #decode, #decode_payload, #encode, #encode_payload

Constructor Details

#initialize(schema:, namespace:) ⇒ AvroBase

Returns a new instance of AvroBase.



16
17
18
19
# File 'lib/deimos/schema_backends/avro_base.rb', line 16

def initialize(schema:, namespace:)
  super(schema: schema, namespace: namespace)
  @schema_store = AvroTurf::MutableSchemaStore.new(path: Deimos.config.schema.path)
end

Instance Attribute Details

#schema_storeObject

Returns the value of attribute schema_store.



13
14
15
# File 'lib/deimos/schema_backends/avro_base.rb', line 13

def schema_store
  @schema_store
end

Class Method Details

.content_typeObject



92
93
94
# File 'lib/deimos/schema_backends/avro_base.rb', line 92

def self.content_type
  'avro/binary'
end

.field_type(avro_schema) ⇒ String

Converts Avro::Schema::NamedSchema’s to String form for generated YARD docs. Recursively handles the typing for Arrays, Maps and Unions.

Parameters:

  • avro_schema (Avro::Schema::NamedSchema)

Returns:

  • (String)

    A string representation of the Type of this SchemaField



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/deimos/schema_backends/avro_base.rb', line 106

def self.field_type(avro_schema)
  case avro_schema.type_sym
  when :string, :boolean
    avro_schema.type_sym.to_s.titleize
  when :int, :long
    'Integer'
  when :float, :double
    'Float'
  when :record, :enum
    schema_classname(avro_schema)
  when :array
    arr_t = field_type(Deimos::SchemaField.new('n/a', avro_schema.items).type)
    "Array<#{arr_t}>"
  when :map
    map_t = field_type(Deimos::SchemaField.new('n/a', avro_schema.values).type)
    "Hash<String, #{map_t}>"
  when :union
    types = avro_schema.schemas.map do |t|
      field_type(Deimos::SchemaField.new('n/a', t).type)
    end
    types.join(', ')
  when :null
    'nil'
  end
end

.mock_backendObject



87
88
89
# File 'lib/deimos/schema_backends/avro_base.rb', line 87

def self.mock_backend
  :avro_validation
end

.schema_base_class(schema) ⇒ Avro::Schema::NamedSchema

Returns the base type of this schema. Decodes Arrays, Maps and Unions

Parameters:

  • schema (Avro::Schema::NamedSchema)

Returns:

  • (Avro::Schema::NamedSchema)


135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/deimos/schema_backends/avro_base.rb', line 135

def self.schema_base_class(schema)
  case schema.type_sym
  when :array
    schema_base_class(schema.items)
  when :map
    schema_base_class(schema.values)
  when :union
    schema.schemas.map(&method(:schema_base_class)).
      reject { |s| s.type_sym == :null }.first
  else
    schema
  end
end

.schema_classname(schema) ⇒ String

Parameters:

  • schema (Avro::Schema::NamedSchema)

    A named schema

Returns:

  • (String)


98
99
100
# File 'lib/deimos/schema_backends/avro_base.rb', line 98

def self.schema_classname(schema)
  schema.name.underscore.camelize.singularize
end

Instance Method Details

#coerce_field(field, value) ⇒ Object



61
62
63
# File 'lib/deimos/schema_backends/avro_base.rb', line 61

def coerce_field(field, value)
  AvroSchemaCoercer.new(avro_schema).coerce_type(field.type, value)
end

#decode_key(payload, key_id) ⇒ Object



30
31
32
33
34
# File 'lib/deimos/schema_backends/avro_base.rb', line 30

def decode_key(payload, key_id)
  @key_schema ||= _generate_key_schema(key_id)
  field_name = _field_name_from_schema(@key_schema)
  decode(payload, schema: @key_schema['name'])[field_name]
end

#encode_key(key_id, key, topic: nil) ⇒ Object



22
23
24
25
26
27
# File 'lib/deimos/schema_backends/avro_base.rb', line 22

def encode_key(key_id, key, topic: nil)
  @key_schema ||= _generate_key_schema(key_id)
  field_name = _field_name_from_schema(@key_schema)
  payload = { field_name => key }
  encode(payload, schema: @key_schema['name'], topic: topic)
end

#load_schemaAvro::Schema

Returns:

  • (Avro::Schema)


82
83
84
# File 'lib/deimos/schema_backends/avro_base.rb', line 82

def load_schema
  avro_schema
end

#schema_fieldsObject



66
67
68
69
70
71
# File 'lib/deimos/schema_backends/avro_base.rb', line 66

def schema_fields
  avro_schema.fields.map do |field|
    enum_values = field.type.type == 'enum' ? field.type.symbols : []
    SchemaField.new(field.name, field.type, enum_values, field.default)
  end
end

#sql_type(field) ⇒ Object

:nodoc:



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/deimos/schema_backends/avro_base.rb', line 37

def sql_type(field)
  type = field.type.type
  return type if %w(array map record).include?(type)

  if type == :union
    non_null = field.type.schemas.reject { |f| f.type == :null }
    if non_null.size > 1
      warn("WARNING: #{field.name} has more than one non-null type. Picking the first for the SQL type.")
    end
    return non_null.first.type
  end
  return type.to_sym if %w(float boolean).include?(type)
  return :integer if type == 'int'
  return :bigint if type == 'long'

  if type == 'double'
    warn('Avro `double` type turns into SQL `float` type. Please ensure you have the correct `limit` set.')
    return :float
  end

  :string
end

#validate(payload, schema:) ⇒ Object



74
75
76
77
78
# File 'lib/deimos/schema_backends/avro_base.rb', line 74

def validate(payload, schema:)
  Avro::SchemaValidator.validate!(avro_schema(schema), payload,
                                  recursive: true,
                                  fail_on_extra_fields: true)
end