Class: Avromatic::IO::DatumReader
- Inherits:
-
Avro::IO::DatumReader
- Object
- Avro::IO::DatumReader
- Avromatic::IO::DatumReader
- Defined in:
- lib/avromatic/io/datum_reader.rb
Overview
Subclass DatumReader to include additional information about the union member index used. The code modified below is based on salsify/avro, branch ‘salsify-master’ with the tag ‘v1.9.0.3’
Constant Summary collapse
- UNION_MEMBER_INDEX =
Avromatic::IO::UNION_MEMBER_INDEX
Instance Method Summary collapse
- #read_data(writers_schema, readers_schema, decoder, initial_record = nil) ⇒ Object
-
#read_record(writers_schema, readers_schema, decoder, initial_record = {}) ⇒ Object
Override to specify an initial record that may contain union index.
Instance Method Details
#read_data(writers_schema, readers_schema, decoder, initial_record = nil) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/avromatic/io/datum_reader.rb', line 13 def read_data(writers_schema, readers_schema, decoder, initial_record = nil) # schema matching unless self.class.match_schemas(writers_schema, readers_schema) raise Avro::IO::SchemaMatchException.new(writers_schema, readers_schema) end # schema resolution: reader's schema is a union, writer's schema is not if writers_schema.type_sym != :union && readers_schema.type_sym == :union rs_index = readers_schema.schemas.find_index do |s| self.class.match_schemas(writers_schema, s) end optional = readers_schema.schemas.first.type_sym == :null union_info = if readers_schema.schemas.size == 2 && optional # Avromatic does not treat the union of null and 1 other type as a union {} elsif optional # Avromatic does not treat the null of an optional field as part of the union { UNION_MEMBER_INDEX => rs_index - 1 } else { UNION_MEMBER_INDEX => rs_index } end return read_data(writers_schema, readers_schema.schemas[rs_index], decoder, union_info) if rs_index raise Avro::IO::SchemaMatchException.new(writers_schema, readers_schema) end # function dispatch for reading data based on type of writer's schema datum = case writers_schema.type_sym when :null; decoder.read_null when :boolean; decoder.read_boolean when :string; decoder.read_string when :int; decoder.read_int when :long; decoder.read_long when :float; decoder.read_float when :double; decoder.read_double when :bytes; decoder.read_bytes when :fixed; read_fixed(writers_schema, readers_schema, decoder) when :enum; read_enum(writers_schema, readers_schema, decoder) when :array; read_array(writers_schema, readers_schema, decoder) when :map; read_map(writers_schema, readers_schema, decoder) when :union; read_union(writers_schema, readers_schema, decoder) when :record, :error, :request; read_record(writers_schema, readers_schema, decoder, initial_record || {}) else raise Avro::AvroError.new("Cannot read unknown schema type: #{writers_schema.type}") end # Allow this code to be used with an official Avro release or the # avro-patches gem that includes logical_type support. if readers_schema.respond_to?(:logical_type) readers_schema.type_adapter.decode(datum) else datum end end |
#read_record(writers_schema, readers_schema, decoder, initial_record = {}) ⇒ Object
Override to specify an initial record that may contain union index
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/avromatic/io/datum_reader.rb', line 70 def read_record(writers_schema, readers_schema, decoder, initial_record = {}) readers_fields_hash = readers_schema.fields_hash read_record = Avromatic.use_custom_datum_reader ? initial_record : {} writers_schema.fields.each do |field| readers_field = readers_fields_hash[field.name] if readers_field field_val = read_data(field.type, readers_field.type, decoder) read_record[field.name] = field_val else skip_data(field.type, decoder) end end read_record end |