Class: AvroUtils

Inherits:
Object
  • Object
show all
Defined in:
lib/utils/avro_utils.rb

Class Method Summary collapse

Class Method Details

.all_names_valid?(string_schema) ⇒ Boolean

Returns:

  • (Boolean)


76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/utils/avro_utils.rb', line 76

def all_names_valid?(string_schema)
  result = { }  
  names = string_schema.scan(/"name":"[\w][^"]+/).map { |x| x.split(':"')[1] }
  invalid_pt = /[^A-Za-z0-9_]/
  names.each do |name|
    invalid_chars = name.scan(invalid_pt)
    if invalid_chars.any?
      result.store(name, invalid_chars)
    end
  end

  if result.any?
    error = "The following names have invalid characters:\n"
    result.each do |key, value|
      error += "#{key}: #{value}\n"
    end

    raise InvalidDataException.new(error)
  end

  true
end

.avro_schema(hash_data, record_type_name) ⇒ Object



26
27
28
# File 'lib/utils/avro_utils.rb', line 26

def avro_schema(hash_data, record_type_name)
  Avro::Schema.parse(avro_schema_hash(hash_data, record_type_name).to_json)
end

.avro_schema_hash(hash_data, record_type_name) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/utils/avro_utils.rb', line 4

def avro_schema_hash(hash_data, record_type_name)
  process_data do  
    result = { 
      type: 'record',
      name: record_type_name,
      fields: []
    }

    hash_data.each do |key, value|
      if value.is_a? Hash 
        result[:fields] << { name: key, type: avro_schema_hash(value, key.to_s.singularize.camelize) }
      elsif value.is_a? Array
        result[:fields] << { name: key, type: { type: 'array', items: RUBY_AVRO_TYPE_MAPPING[value[0].class.name.to_sym] } }
      else
        result[:fields] << { name: key, type: RUBY_AVRO_TYPE_MAPPING[value.class.name.to_sym] }
      end
    end
    
    result
  end
end

.bulk_json_to_avro(collection, record_type_name, filename = nil) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/utils/avro_utils.rb', line 47

def bulk_json_to_avro(collection, record_type_name, filename = nil)
  process_data do
    buffer = (filename.nil?)? StringIO.new : File.new(filename, 'wb')

    schema = avro_schema(hash_with_string_keys(collection.first), record_type_name)
    file_writer  = Avro::DataFile::Writer.new(buffer, Avro::IO::DatumWriter.new(schema), schema)

    collection.each do |json_data|
      hash_data = hash_with_string_keys(json_data)
      file_writer << hash_data
    end

    file_writer.flush

    buffer.rewind
    buffer.close if buffer.is_a? File
    buffer
  end
end

.hash_with_string_keys(json_data) ⇒ Object

Avro assumes that all the hashes use strings for keys. It does not accept Indifferent Hash. So the conversion bellow ensures that the hash data uses strings for keys.



102
103
104
105
106
107
# File 'lib/utils/avro_utils.rb', line 102

def hash_with_string_keys(json_data)
  process_data do
    json_data = json_data.to_json if json_data.is_a? Hash
    hash_data = JSON.parse(json_data)
  end
end

.is_valid_schema?(schema) ⇒ Boolean

Returns:

  • (Boolean)


67
68
69
70
71
72
73
74
# File 'lib/utils/avro_utils.rb', line 67

def is_valid_schema?(schema)
  process_data do
    working_schema = ( [Hash, Array].include?(schema.class) )? schema.to_json : schema.dup
    all_names_valid?(working_schema)
    Avro::Schema.parse(working_schema)
    true
  end
end

.json_to_avro(json_data, record_type_name, filename = nil) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/utils/avro_utils.rb', line 30

def json_to_avro(json_data, record_type_name, filename = nil)
  process_data do  
    buffer = (filename.nil?)? StringIO.new : File.new(filename, 'wb')
    hash_data = hash_with_string_keys(json_data)

    schema = avro_schema(hash_data, record_type_name)

    file_writer  = Avro::DataFile::Writer.new(buffer, Avro::IO::DatumWriter.new(schema), schema)
    file_writer << hash_data
    file_writer.flush

    buffer.rewind
    buffer.close if buffer.is_a? File
    buffer
  end
end