Class: AvroUtils

Inherits:
Object
  • Object
show all
Defined in:
lib/utils/avro_utils.rb

Class Method Summary collapse

Class Method Details

.avro_schema(hash_data, record_type_name) ⇒ Object



27
28
29
# File 'lib/utils/avro_utils.rb', line 27

def avro_schema(hash_data, record_type_name)
  Avro::Schema.parse(avro_schema_hash(hash_data, record_type_name).to_json)
end

.avro_schema_hash(hash_data, record_type_name) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/utils/avro_utils.rb', line 4

def avro_schema_hash(hash_data, record_type_name)
  result = { 
    type: 'record',
    name: record_type_name,
    fields: []
  }

  hash_data.each do |key, value|
    if value.is_a? Hash 
      result[:fields] << { name: key, type: avro_schema_hash(value, key.to_s.singularize.camelize) }
    elsif value.is_a? Array
      result[:fields] << { name: key, type: { type: 'array', items: RUBY_AVRO_TYPE_MAPPING[value[0].class.name.to_sym] } }
    else
      result[:fields] << { name: key, type: RUBY_AVRO_TYPE_MAPPING[value.class.name.to_sym] }
    end
  end
  
  result
rescue StandardError => error
  puts("Error: #{error.message}\n#{error.backtrace.join("\n")}")
  raise InvalidDataException.new(error.message)
end

.json_to_avro(json_data, record_type_name) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/utils/avro_utils.rb', line 31

def json_to_avro(json_data, record_type_name)
  buffer = StringIO.new

  # Avro assumes that all the hashes use strings for keys. It does not accept Indifferent Hash.
  # So the conversion bellow ensures that the hash data uses strings for keys.
  json_data = json_data.to_json if json_data.is_a? Hash
  hash_data = JSON.parse(json_data)

  schema = avro_schema(hash_data, record_type_name)

  datum_writer = Avro::IO::DatumWriter.new(schema)
  file_writer  = Avro::DataFile::Writer.new(buffer, datum_writer, schema)
  file_writer << hash_data
  file_writer.flush

  buffer.rewind
  buffer
rescue StandardError => error
  puts("Error: #{error.message}\n#{error.backtrace.join("\n")}")
  raise InvalidDataException.new(error.message)
end