Class: Deimos::AvroSchemaCoercer

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/schema_backends/avro_schema_coercer.rb

Overview

Class to coerce values in a payload to match a schema.

Instance Method Summary collapse

Constructor Details

#initialize(schema) ⇒ AvroSchemaCoercer

Returns a new instance of AvroSchemaCoercer.

Parameters:

  • schema (Avro::Schema)


9
10
11
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 9

def initialize(schema)
  @schema = schema
end

Instance Method Details

#coerce_boolean(val) ⇒ Boolean

Parameters:

  • val (Object)

Returns:

  • (Boolean)


115
116
117
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 115

def coerce_boolean(val) # rubocop:disable Naming/PredicateMethod
  !(val.nil? || val == false)
end

#coerce_float(val) ⇒ Float

Parameters:

  • val (Object)

Returns:

  • (Float)


95
96
97
98
99
100
101
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 95

def coerce_float(val)
  if val.is_a?(Numeric) || _is_float_string?(val)
    val.to_f
  else
    val # this will fail
  end
end

#coerce_int(type, val) ⇒ Integer

Parameters:

  • type (Avro::Schema::RecordSchema)
  • val (Object)

Returns:

  • (Integer)


80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 80

def coerce_int(type, val)
  int_classes = [Time, ActiveSupport::TimeWithZone]
  if %w(timestamp-millis timestamp-micros).include?(type.logical_type)
    val
  elsif val.is_a?(Integer) ||
        _is_integer_string?(val) ||
        int_classes.any? { |klass| val.is_a?(klass) }
    val.to_i
  else # rubocop:disable Lint/DuplicateBranch
    val # this will fail
  end
end

#coerce_record(type, val) ⇒ Object

Coerce sub-records in a payload to match the schema.

Parameters:

  • type (Avro::Schema::RecordSchema)
  • val (Object)

Returns:

  • (Object)


67
68
69
70
71
72
73
74
75
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 67

def coerce_record(type, val)
  return nil if val.nil?

  record = val.map do |name, value|
    field = type.fields.find { |f| f.name == name }
    coerce_type(field.type, value)
  end
  val.keys.zip(record).to_h
end

#coerce_string(val) ⇒ String

Parameters:

  • val (Object)

Returns:

  • (String)


105
106
107
108
109
110
111
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 105

def coerce_string(val)
  if val.respond_to?(:to_str) || _is_to_s_defined?(val)
    val.to_s
  else
    val # this will fail
  end
end

#coerce_type(type, val) ⇒ Object

Coerce values in a payload to match the schema.

Parameters:

  • type (Avro::Schema)
  • val (Object)

Returns:

  • (Object)


123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 123

def coerce_type(type, val)
  field_type = type.type.to_sym

  case field_type
  when :int, :long
    coerce_int(type, val)
  when :float, :double
    coerce_float(val)
  when :string
    coerce_string(val)
  when :boolean
    coerce_boolean(val)
  when :union
    coerce_union(type, val)
  when :record
    coerce_record(type, val)
  else
    val
  end
end

#coerce_union(type, val) ⇒ Object

Coerce sub-records in a payload to match the schema.

Parameters:

  • type (Avro::Schema::UnionSchema)
  • val (Object)

Returns:

  • (Object)


17
18
19
20
21
22
23
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 17

def coerce_union(type, val)
  union_types = type.schemas.map { |s| s.type.to_sym }
  return nil if val.nil? && union_types.include?(:null)

  schema_type = find_schema_type(type, val)
  coerce_type(schema_type, val)
end

#find_schema_type(type, val) ⇒ Avro::Schema::PrimitiveSchema

Find the right schema for val from a UnionSchema.

Parameters:

  • type (Avro::Schema::UnionSchema)
  • val (Object)

Returns:

  • (Avro::Schema::PrimitiveSchema)


29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/deimos/schema_backends/avro_schema_coercer.rb', line 29

def find_schema_type(type, val)
  int_classes = [Time, ActiveSupport::TimeWithZone]

  schema_type = type.schemas.find do |schema|
    field_type = schema.type.to_sym

    case field_type
    when :int, :long
      val.is_a?(Integer) ||
        _is_integer_string?(val) ||
        int_classes.any? { |klass| val.is_a?(klass) }
    when :float, :double
      val.is_a?(Numeric) || _is_float_string?(val)
    when :array
      val.is_a?(Array)
    when :record
      if val.is_a?(Hash)
        schema_fields_set = Set.new(schema.fields.map(&:name))
        Set.new(val.keys).subset?(schema_fields_set)
      else
        # If the value is not a hash, we can't coerce it to a record.
        # Keep looking for another schema
        false
      end
    else
      schema.type.to_sym != :null
    end
  end

  raise "No Schema type found for VALUE: #{val}\n TYPE: #{type}" if schema_type.nil?

  schema_type
end