Class: AvroUtils
- Inherits:
-
Object
- Object
- AvroUtils
- Defined in:
- lib/utils/avro_utils.rb
Class Method Summary collapse
- .all_names_valid?(string_schema) ⇒ Boolean
- .avro_schema(hash_data, record_type_name) ⇒ Object
- .avro_schema_hash(hash_data, record_type_name) ⇒ Object
- .bulk_json_to_avro(collection, record_type_name, filename = nil) ⇒ Object
-
.hash_with_string_keys(json_data) ⇒ Object
Avro assumes that all the hashes use strings for keys.
- .is_valid_schema?(schema) ⇒ Boolean
- .json_to_avro(json_data, record_type_name, filename = nil) ⇒ Object
Class Method Details
.all_names_valid?(string_schema) ⇒ Boolean
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/utils/avro_utils.rb', line 76 def all_names_valid?(string_schema) result = { } names = string_schema.scan(/"name":"[\w][^"]+/).map { |x| x.split(':"')[1] } invalid_pt = /[^A-Za-z0-9_]/ names.each do |name| invalid_chars = name.scan(invalid_pt) if invalid_chars.any? result.store(name, invalid_chars) end end if result.any? error = "The following names have invalid characters:\n" result.each do |key, value| error += "#{key}: #{value}\n" end raise InvalidDataException.new(error) end true end |
.avro_schema(hash_data, record_type_name) ⇒ Object
26 27 28 |
# File 'lib/utils/avro_utils.rb', line 26 def avro_schema(hash_data, record_type_name) Avro::Schema.parse(avro_schema_hash(hash_data, record_type_name).to_json) end |
.avro_schema_hash(hash_data, record_type_name) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/utils/avro_utils.rb', line 4 def avro_schema_hash(hash_data, record_type_name) process_data do result = { type: 'record', name: record_type_name, fields: [] } hash_data.each do |key, value| if value.is_a? Hash result[:fields] << { name: key, type: avro_schema_hash(value, key.to_s.singularize.camelize) } elsif value.is_a? Array result[:fields] << { name: key, type: { type: 'array', items: RUBY_AVRO_TYPE_MAPPING[value[0].class.name.to_sym] } } else result[:fields] << { name: key, type: RUBY_AVRO_TYPE_MAPPING[value.class.name.to_sym] } end end result end end |
.bulk_json_to_avro(collection, record_type_name, filename = nil) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/utils/avro_utils.rb', line 47 def bulk_json_to_avro(collection, record_type_name, filename = nil) process_data do buffer = (filename.nil?)? StringIO.new : File.new(filename, 'wb') schema = avro_schema(hash_with_string_keys(collection.first), record_type_name) file_writer = Avro::DataFile::Writer.new(buffer, Avro::IO::DatumWriter.new(schema), schema) collection.each do |json_data| hash_data = hash_with_string_keys(json_data) file_writer << hash_data end file_writer.flush buffer.rewind buffer.close if buffer.is_a? File buffer end end |
.hash_with_string_keys(json_data) ⇒ Object
Avro assumes that all the hashes use strings for keys. It does not accept Indifferent Hash. So the conversion bellow ensures that the hash data uses strings for keys.
102 103 104 105 106 107 |
# File 'lib/utils/avro_utils.rb', line 102 def hash_with_string_keys(json_data) process_data do json_data = json_data.to_json if json_data.is_a? Hash hash_data = JSON.parse(json_data) end end |
.is_valid_schema?(schema) ⇒ Boolean
67 68 69 70 71 72 73 74 |
# File 'lib/utils/avro_utils.rb', line 67 def is_valid_schema?(schema) process_data do working_schema = ( [Hash, Array].include?(schema.class) )? schema.to_json : schema.dup all_names_valid?(working_schema) Avro::Schema.parse(working_schema) true end end |
.json_to_avro(json_data, record_type_name, filename = nil) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/utils/avro_utils.rb', line 30 def json_to_avro(json_data, record_type_name, filename = nil) process_data do buffer = (filename.nil?)? StringIO.new : File.new(filename, 'wb') hash_data = hash_with_string_keys(json_data) schema = avro_schema(hash_data, record_type_name) file_writer = Avro::DataFile::Writer.new(buffer, Avro::IO::DatumWriter.new(schema), schema) file_writer << hash_data file_writer.flush buffer.rewind buffer.close if buffer.is_a? File buffer end end |