Class: Deimos::AvroSchemaCoercer

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/schema_backends/avro_schema_coercer.rb

Overview

Class to coerce values in a payload to match a schema.

Instance Method Summary collapse

Constructor Details

#initialize(schema) ⇒ AvroSchemaCoercer



9
10
11
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 9

def initialize(schema)
  @schema = schema
end

Instance Method Details

#coerce_record(type, val) ⇒ Object

Coerce sub-records in a payload to match the schema.



67
68
69
70
71
72
73
74
75
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 67

def coerce_record(type, val)
  return nil if val.nil?

  record = val.map do |name, value|
    field = type.fields.find { |f| f.name == name }
    coerce_type(field.type, value)
  end
  val.keys.zip(record).to_h
end

#coerce_type(type, val) ⇒ Object

Coerce values in a payload to match the schema.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 81

def coerce_type(type, val)
  int_classes = [Time, ActiveSupport::TimeWithZone]
  field_type = type.type.to_sym

  case field_type
  when :int, :long
    if %w(timestamp-millis timestamp-micros).include?(type.logical_type)
      val
    elsif val.is_a?(Integer) ||
          _is_integer_string?(val) ||
          int_classes.any? { |klass| val.is_a?(klass) }
      val.to_i
    else
      val # this will fail
    end
  when :float, :double
    if val.is_a?(Numeric) || _is_float_string?(val)
      val.to_f
    else
      val # this will fail
    end
  when :string
    if val.respond_to?(:to_str)
      val.to_s
    elsif _is_to_s_defined?(val)
      val.to_s
    else
      val # this will fail
    end
  when :boolean
    if val.nil? || val == false
      false
    else
      true
    end
  when :union
    coerce_union(type, val)
  when :record
    coerce_record(type, val)
  else
    val
  end
end

#coerce_union(type, val) ⇒ Object

Coerce sub-records in a payload to match the schema.



17
18
19
20
21
22
23
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 17

def coerce_union(type, val)
  union_types = type.schemas.map { |s| s.type.to_sym }
  return nil if val.nil? && union_types.include?(:null)

  schema_type = find_schema_type(type, val)
  coerce_type(schema_type, val)
end

#find_schema_type(type, val) ⇒ Avro::Schema::PrimitiveSchema

Find the right schema for val from a UnionSchema.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 29

def find_schema_type(type, val)
  int_classes = [Time, ActiveSupport::TimeWithZone]

  schema_type = type.schemas.find do |schema|
    field_type = schema.type.to_sym

    case field_type
    when :int, :long
      val.is_a?(Integer) ||
        _is_integer_string?(val) ||
        int_classes.any? { |klass| val.is_a?(klass) }
    when :float, :double
      val.is_a?(Numeric) || _is_float_string?(val)
    when :array
      val.is_a?(Array)
    when :record
      if val.is_a?(Hash)
        schema_fields_set = Set.new(schema.fields.map(&:name))
        Set.new(val.keys).subset?(schema_fields_set)
      else
        # If the value is not a hash, we can't coerce it to a record.
        # Keep looking for another schema
        false
      end
    else
      schema.type.to_sym != :null
    end
  end

  raise "No Schema type found for VALUE: #{val}\n TYPE: #{type}" if schema_type.nil?

  schema_type
end