Class: Avro::IO::DatumReader

Inherits:
Object
  • Object
show all
Defined in:
lib/avro/io.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(writers_schema = nil, readers_schema = nil) ⇒ DatumReader

Returns a new instance of DatumReader.



229
230
231
232
# File 'lib/avro/io.rb', line 229

def initialize(writers_schema=nil, readers_schema=nil)
  @writers_schema = writers_schema
  @readers_schema = readers_schema
end

Instance Attribute Details

#readers_schemaObject

Returns the value of attribute readers_schema.



227
228
229
# File 'lib/avro/io.rb', line 227

def readers_schema
  @readers_schema
end

#writers_schemaObject

Returns the value of attribute writers_schema.



227
228
229
# File 'lib/avro/io.rb', line 227

def writers_schema
  @writers_schema
end

Class Method Details

.match_schemas(writers_schema, readers_schema) ⇒ Object



223
224
225
# File 'lib/avro/io.rb', line 223

def self.match_schemas(writers_schema, readers_schema)
  Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema)
end

Instance Method Details

#read(decoder) ⇒ Object



234
235
236
237
# File 'lib/avro/io.rb', line 234

def read(decoder)
  self.readers_schema = writers_schema unless readers_schema
  read_data(writers_schema, readers_schema, decoder)
end

#read_array(writers_schema, readers_schema, decoder) ⇒ Object



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/avro/io.rb', line 296

def read_array(writers_schema, readers_schema, decoder)
  read_items = []
  block_count = decoder.read_long
  while block_count != 0
    if block_count < 0
      block_count = -block_count
      block_size = decoder.read_long
    end
    block_count.times do
      read_items << read_data(writers_schema.items,
                              readers_schema.items,
                              decoder)
    end
    block_count = decoder.read_long
  end

  read_items
end

#read_data(writers_schema, readers_schema, decoder) ⇒ Object



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/avro/io.rb', line 239

def read_data(writers_schema, readers_schema, decoder)
  # schema matching
  unless self.class.match_schemas(writers_schema, readers_schema)
    raise SchemaMatchException.new(writers_schema, readers_schema)
  end

  # schema resolution: reader's schema is a union, writer's
  # schema is not
  if writers_schema.type_sym != :union && readers_schema.type_sym == :union
    rs = readers_schema.schemas.find{|s|
      self.class.match_schemas(writers_schema, s)
    }
    return read_data(writers_schema, rs, decoder) if rs
    raise SchemaMatchException.new(writers_schema, readers_schema)
  end

  # function dispatch for reading data based on type of writer's
  # schema
  datum = case writers_schema.type_sym
  when :null;    decoder.read_null
  when :boolean; decoder.read_boolean
  when :string;  decoder.read_string
  when :int;     decoder.read_int
  when :long;    decoder.read_long
  when :float;   decoder.read_float
  when :double;  decoder.read_double
  when :bytes;   decoder.read_bytes
  when :fixed;   read_fixed(writers_schema, readers_schema, decoder)
  when :enum;    read_enum(writers_schema, readers_schema, decoder)
  when :array;   read_array(writers_schema, readers_schema, decoder)
  when :map;     read_map(writers_schema, readers_schema, decoder)
  when :union;   read_union(writers_schema, readers_schema, decoder)
  when :record, :error, :request;  read_record(writers_schema, readers_schema, decoder)
  else
    raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
  end

  readers_schema.type_adapter.decode(datum)
end

#read_default_value(field_schema, default_value) ⇒ Object



372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/avro/io.rb', line 372

def read_default_value(field_schema, default_value)
  if default_value == :no_default
    raise AvroError, "Missing data for #{field_schema} with no default"
  end

  # Basically a JSON Decoder?
  case field_schema.type_sym
  when :null
    return nil
  when :boolean
    return default_value
  when :int, :long
    return Integer(default_value)
  when :float, :double
    return Float(default_value)
  when :enum, :fixed, :string, :bytes
    return default_value
  when :array
    read_array = []
    default_value.each do |json_val|
      item_val = read_default_value(field_schema.items, json_val)
      read_array << item_val
    end
    return read_array
  when :map
    read_map = {}
    default_value.each do |key, json_val|
      map_val = read_default_value(field_schema.values, json_val)
      read_map[key] = map_val
    end
    return read_map
  when :union
    return read_default_value(field_schema.schemas[0], default_value)
  when :record, :error
    read_record = {}
    field_schema.fields.each do |field|
      json_val = default_value[field.name]
      json_val = field.default unless json_val
      field_val = read_default_value(field.type, json_val)
      read_record[field.name] = field_val
    end
    return read_record
  else
    fail_msg = "Unknown type: #{field_schema.type}"
    raise AvroError, fail_msg
  end
end

#read_enum(writers_schema, readers_schema, decoder) ⇒ Object



283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/avro/io.rb', line 283

def read_enum(writers_schema, readers_schema, decoder)
  index_of_symbol = decoder.read_int
  read_symbol = writers_schema.symbols[index_of_symbol]

  # TODO(jmhodges): figure out what unset means for resolution
  # schema resolution
  unless readers_schema.symbols.include?(read_symbol)
    # 'unset' here
  end

  read_symbol
end

#read_fixed(writers_schema, readers_schema, decoder) ⇒ Object



279
280
281
# File 'lib/avro/io.rb', line 279

def read_fixed(writers_schema, readers_schema, decoder)
  decoder.read(writers_schema.size)
end

#read_map(writers_schema, readers_schema, decoder) ⇒ Object



315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/avro/io.rb', line 315

def read_map(writers_schema, readers_schema, decoder)
  read_items = {}
  block_count = decoder.read_long
  while block_count != 0
    if block_count < 0
      block_count = -block_count
      block_size = decoder.read_long
    end
    block_count.times do
      key = decoder.read_string
      read_items[key] = read_data(writers_schema.values,
                                  readers_schema.values,
                                  decoder)
    end
    block_count = decoder.read_long
  end

  read_items
end

#read_record(writers_schema, readers_schema, decoder) ⇒ Object



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/avro/io.rb', line 342

def read_record(writers_schema, readers_schema, decoder)
  readers_fields_hash = readers_schema.fields_hash
  read_record = {}
  writers_schema.fields.each do |field|
    if readers_field = readers_fields_hash[field.name]
      field_val = read_data(field.type, readers_field.type, decoder)
      read_record[field.name] = field_val
    else
      skip_data(field.type, decoder)
    end
  end

  # fill in the default values
  if readers_fields_hash.size > read_record.size
    writers_fields_hash = writers_schema.fields_hash
    readers_fields_hash.each do |field_name, field|
      unless writers_fields_hash.has_key? field_name
        if field.default?
          field_val = read_default_value(field.type, field.default)
          read_record[field.name] = field_val
        else
          raise AvroError, "Missing data for #{field.type} with no default"
        end
      end
    end
  end

  read_record
end

#read_union(writers_schema, readers_schema, decoder) ⇒ Object



335
336
337
338
339
340
# File 'lib/avro/io.rb', line 335

def read_union(writers_schema, readers_schema, decoder)
  index_of_schema = decoder.read_long
  selected_writers_schema = writers_schema.schemas[index_of_schema]

  read_data(selected_writers_schema, readers_schema, decoder)
end

#skip_array(writers_schema, decoder) ⇒ Object



468
469
470
# File 'lib/avro/io.rb', line 468

def skip_array(writers_schema, decoder)
  skip_blocks(decoder) { skip_data(writers_schema.items, decoder) }
end

#skip_data(writers_schema, decoder) ⇒ Object



420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
# File 'lib/avro/io.rb', line 420

def skip_data(writers_schema, decoder)
  case writers_schema.type_sym
  when :null
    decoder.skip_null
  when :boolean
    decoder.skip_boolean
  when :string
    decoder.skip_string
  when :int
    decoder.skip_int
  when :long
    decoder.skip_long
  when :float
    decoder.skip_float
  when :double
    decoder.skip_double
  when :bytes
    decoder.skip_bytes
  when :fixed
    skip_fixed(writers_schema, decoder)
  when :enum
    skip_enum(writers_schema, decoder)
  when :array
    skip_array(writers_schema, decoder)
  when :map
    skip_map(writers_schema, decoder)
  when :union
    skip_union(writers_schema, decoder)
  when :record, :error, :request
    skip_record(writers_schema, decoder)
  else
    raise AvroError, "Unknown schema type: #{writers_schema.type}"
  end
end

#skip_enum(writers_schema, decoder) ⇒ Object



459
460
461
# File 'lib/avro/io.rb', line 459

def skip_enum(writers_schema, decoder)
  decoder.skip_int
end

#skip_fixed(writers_schema, decoder) ⇒ Object



455
456
457
# File 'lib/avro/io.rb', line 455

def skip_fixed(writers_schema, decoder)
  decoder.skip(writers_schema.size)
end

#skip_map(writers_schema, decoder) ⇒ Object



472
473
474
475
476
477
# File 'lib/avro/io.rb', line 472

def skip_map(writers_schema, decoder)
  skip_blocks(decoder) {
    decoder.skip_string
    skip_data(writers_schema.values, decoder)
  }
end

#skip_record(writers_schema, decoder) ⇒ Object



479
480
481
# File 'lib/avro/io.rb', line 479

def skip_record(writers_schema, decoder)
  writers_schema.fields.each{|f| skip_data(f.type, decoder) }
end

#skip_union(writers_schema, decoder) ⇒ Object



463
464
465
466
# File 'lib/avro/io.rb', line 463

def skip_union(writers_schema, decoder)
  index = decoder.read_long
  skip_data(writers_schema.schemas[index], decoder)
end