Class: AvroUtils
- Inherits:
-
Object
- Object
- AvroUtils
- Defined in:
- lib/utils/avro_utils.rb
Class Method Summary collapse
- .avro_schema(hash_data, record_type_name) ⇒ Object
- .avro_schema_hash(hash_data, record_type_name) ⇒ Object
- .json_to_avro(json_data, record_type_name) ⇒ Object
Class Method Details
.avro_schema(hash_data, record_type_name) ⇒ Object
27 28 29 |
# File 'lib/utils/avro_utils.rb', line 27 def avro_schema(hash_data, record_type_name) Avro::Schema.parse(avro_schema_hash(hash_data, record_type_name).to_json) end |
.avro_schema_hash(hash_data, record_type_name) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/utils/avro_utils.rb', line 4 def avro_schema_hash(hash_data, record_type_name) result = { type: 'record', name: record_type_name, fields: [] } hash_data.each do |key, value| if value.is_a? Hash result[:fields] << { name: key, type: avro_schema_hash(value, key.to_s.singularize.camelize) } elsif value.is_a? Array result[:fields] << { name: key, type: { type: 'array', items: RUBY_AVRO_TYPE_MAPPING[value[0].class.name.to_sym] } } else result[:fields] << { name: key, type: RUBY_AVRO_TYPE_MAPPING[value.class.name.to_sym] } end end result rescue StandardError => error puts("Error: #{error.}\n#{error.backtrace.join("\n")}") raise InvalidDataException.new(error.) end |
.json_to_avro(json_data, record_type_name) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/utils/avro_utils.rb', line 31 def json_to_avro(json_data, record_type_name) buffer = StringIO.new # Avro assumes that all the hashes use strings for keys. It does not accept Indifferent Hash. # So the conversion bellow ensures that the hash data uses strings for keys. json_data = json_data.to_json if json_data.is_a? Hash hash_data = JSON.parse(json_data) schema = avro_schema(hash_data, record_type_name) datum_writer = Avro::IO::DatumWriter.new(schema) file_writer = Avro::DataFile::Writer.new(buffer, datum_writer, schema) file_writer << hash_data file_writer.flush buffer.rewind buffer rescue StandardError => error puts("Error: #{error.}\n#{error.backtrace.join("\n")}") raise InvalidDataException.new(error.) end |