Class: Avro::IO::DatumReader

Inherits:
Object
  • Object
show all
Defined in:
lib/avro/io.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(writers_schema = nil, readers_schema = nil) ⇒ DatumReader

Returns a new instance of DatumReader.



268
269
270
271
# File 'lib/avro/io.rb', line 268

def initialize(writers_schema=nil, readers_schema=nil)
  @writers_schema = writers_schema
  @readers_schema = readers_schema
end

Instance Attribute Details

#readers_schemaObject

Returns the value of attribute readers_schema.



266
267
268
# File 'lib/avro/io.rb', line 266

def readers_schema
  @readers_schema
end

#writers_schemaObject

Returns the value of attribute writers_schema.



266
267
268
# File 'lib/avro/io.rb', line 266

def writers_schema
  @writers_schema
end

Class Method Details

.match_schemas(writers_schema, readers_schema) ⇒ Object



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/avro/io.rb', line 223

def self.match_schemas(writers_schema, readers_schema)
  w_type = writers_schema.type_sym
  r_type = readers_schema.type_sym

  # This conditional is begging for some OO love.
  if w_type == :union || r_type == :union
    return true
  end

  if w_type == r_type
    return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type)

    case r_type
    when :record
      return writers_schema.fullname == readers_schema.fullname
    when :error
      return writers_schema.fullname == readers_schema.fullname
    when :request
      return true
    when :fixed
      return writers_schema.fullname == readers_schema.fullname &&
             writers_schema.size == readers_schema.size
    when :enum
      return writers_schema.fullname == readers_schema.fullname
    when :map
      return writers_schema.values.type == readers_schema.values.type
    when :array
      return writers_schema.items.type == readers_schema.items.type
    end
  end

  # Handle schema promotion
  if w_type == :int && [:long, :float, :double].include?(r_type)
    return true
  elsif w_type == :long && [:float, :double].include?(r_type)
    return true
  elsif w_type == :float && r_type == :double
    return true
  end

  return false
end

Instance Method Details

#read(decoder) ⇒ Object



273
274
275
276
# File 'lib/avro/io.rb', line 273

def read(decoder)
  self.readers_schema = writers_schema unless readers_schema
  read_data(writers_schema, readers_schema, decoder)
end

#read_array(writers_schema, readers_schema, decoder) ⇒ Object



335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/avro/io.rb', line 335

def read_array(writers_schema, readers_schema, decoder)
  read_items = []
  block_count = decoder.read_long
  while block_count != 0
    if block_count < 0
      block_count = -block_count
      block_size = decoder.read_long
    end
    block_count.times do
      read_items << read_data(writers_schema.items,
                              readers_schema.items,
                              decoder)
    end
    block_count = decoder.read_long
  end

  read_items
end

#read_data(writers_schema, readers_schema, decoder) ⇒ Object



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/avro/io.rb', line 278

def read_data(writers_schema, readers_schema, decoder)
  # schema matching
  unless self.class.match_schemas(writers_schema, readers_schema)
    raise SchemaMatchException.new(writers_schema, readers_schema)
  end

  # schema resolution: reader's schema is a union, writer's
  # schema is not
  if writers_schema.type_sym != :union && readers_schema.type_sym == :union
    rs = readers_schema.schemas.find{|s|
      self.class.match_schemas(writers_schema, s)
    }
    return read_data(writers_schema, rs, decoder) if rs
    raise SchemaMatchException.new(writers_schema, readers_schema)
  end

  # function dispatch for reading data based on type of writer's
  # schema
  datum = case writers_schema.type_sym
  when :null;    decoder.read_null
  when :boolean; decoder.read_boolean
  when :string;  decoder.read_string
  when :int;     decoder.read_int
  when :long;    decoder.read_long
  when :float;   decoder.read_float
  when :double;  decoder.read_double
  when :bytes;   decoder.read_bytes
  when :fixed;   read_fixed(writers_schema, readers_schema, decoder)
  when :enum;    read_enum(writers_schema, readers_schema, decoder)
  when :array;   read_array(writers_schema, readers_schema, decoder)
  when :map;     read_map(writers_schema, readers_schema, decoder)
  when :union;   read_union(writers_schema, readers_schema, decoder)
  when :record, :error, :request;  read_record(writers_schema, readers_schema, decoder)
  else
    raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
  end

  readers_schema.type_adapter.decode(datum)
end

#read_default_value(field_schema, default_value) ⇒ Object



411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
# File 'lib/avro/io.rb', line 411

def read_default_value(field_schema, default_value)
  # Basically a JSON Decoder?
  case field_schema.type_sym
  when :null
    return nil
  when :boolean
    return default_value
  when :int, :long
    return Integer(default_value)
  when :float, :double
    return Float(default_value)
  when :enum, :fixed, :string, :bytes
    return default_value
  when :array
    read_array = []
    default_value.each do |json_val|
      item_val = read_default_value(field_schema.items, json_val)
      read_array << item_val
    end
    return read_array
  when :map
    read_map = {}
    default_value.each do |key, json_val|
      map_val = read_default_value(field_schema.values, json_val)
      read_map[key] = map_val
    end
    return read_map
  when :union
    return read_default_value(field_schema.schemas[0], default_value)
  when :record, :error
    read_record = {}
    field_schema.fields.each do |field|
      json_val = default_value[field.name]
      json_val = field.default unless json_val
      field_val = read_default_value(field.type, json_val)
      read_record[field.name] = field_val
    end
    return read_record
  else
    fail_msg = "Unknown type: #{field_schema.type}"
    raise AvroError, fail_msg
  end
end

#read_enum(writers_schema, readers_schema, decoder) ⇒ Object



322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/avro/io.rb', line 322

def read_enum(writers_schema, readers_schema, decoder)
  index_of_symbol = decoder.read_int
  read_symbol = writers_schema.symbols[index_of_symbol]

  # TODO(jmhodges): figure out what unset means for resolution
  # schema resolution
  unless readers_schema.symbols.include?(read_symbol)
    # 'unset' here
  end

  read_symbol
end

#read_fixed(writers_schema, readers_schema, decoder) ⇒ Object



318
319
320
# File 'lib/avro/io.rb', line 318

def read_fixed(writers_schema, readers_schema, decoder)
  decoder.read(writers_schema.size)
end

#read_map(writers_schema, readers_schema, decoder) ⇒ Object



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/avro/io.rb', line 354

def read_map(writers_schema, readers_schema, decoder)
  read_items = {}
  block_count = decoder.read_long
  while block_count != 0
    if block_count < 0
      block_count = -block_count
      block_size = decoder.read_long
    end
    block_count.times do
      key = decoder.read_string
      read_items[key] = read_data(writers_schema.values,
                                  readers_schema.values,
                                  decoder)
    end
    block_count = decoder.read_long
  end

  read_items
end

#read_record(writers_schema, readers_schema, decoder) ⇒ Object



381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
# File 'lib/avro/io.rb', line 381

def read_record(writers_schema, readers_schema, decoder)
  readers_fields_hash = readers_schema.fields_hash
  read_record = {}
  writers_schema.fields.each do |field|
    if readers_field = readers_fields_hash[field.name]
      field_val = read_data(field.type, readers_field.type, decoder)
      read_record[field.name] = field_val
    else
      skip_data(field.type, decoder)
    end
  end

  # fill in the default values
  if readers_fields_hash.size > read_record.size
    writers_fields_hash = writers_schema.fields_hash
    readers_fields_hash.each do |field_name, field|
      unless writers_fields_hash.has_key? field_name
        if !field.default.nil?
          field_val = read_default_value(field.type, field.default)
          read_record[field.name] = field_val
        else
          # FIXME(jmhodges) another 'unset' here
        end
      end
    end
  end

  read_record
end

#read_union(writers_schema, readers_schema, decoder) ⇒ Object



374
375
376
377
378
379
# File 'lib/avro/io.rb', line 374

def read_union(writers_schema, readers_schema, decoder)
  index_of_schema = decoder.read_long
  selected_writers_schema = writers_schema.schemas[index_of_schema]

  read_data(selected_writers_schema, readers_schema, decoder)
end

#skip_array(writers_schema, decoder) ⇒ Object



503
504
505
# File 'lib/avro/io.rb', line 503

def skip_array(writers_schema, decoder)
  skip_blocks(decoder) { skip_data(writers_schema.items, decoder) }
end

#skip_data(writers_schema, decoder) ⇒ Object



455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
# File 'lib/avro/io.rb', line 455

def skip_data(writers_schema, decoder)
  case writers_schema.type_sym
  when :null
    decoder.skip_null
  when :boolean
    decoder.skip_boolean
  when :string
    decoder.skip_string
  when :int
    decoder.skip_int
  when :long
    decoder.skip_long
  when :float
    decoder.skip_float
  when :double
    decoder.skip_double
  when :bytes
    decoder.skip_bytes
  when :fixed
    skip_fixed(writers_schema, decoder)
  when :enum
    skip_enum(writers_schema, decoder)
  when :array
    skip_array(writers_schema, decoder)
  when :map
    skip_map(writers_schema, decoder)
  when :union
    skip_union(writers_schema, decoder)
  when :record, :error, :request
    skip_record(writers_schema, decoder)
  else
    raise AvroError, "Unknown schema type: #{writers_schema.type}"
  end
end

#skip_enum(writers_schema, decoder) ⇒ Object



494
495
496
# File 'lib/avro/io.rb', line 494

def skip_enum(writers_schema, decoder)
  decoder.skip_int
end

#skip_fixed(writers_schema, decoder) ⇒ Object



490
491
492
# File 'lib/avro/io.rb', line 490

def skip_fixed(writers_schema, decoder)
  decoder.skip(writers_schema.size)
end

#skip_map(writers_schema, decoder) ⇒ Object



507
508
509
510
511
512
# File 'lib/avro/io.rb', line 507

def skip_map(writers_schema, decoder)
  skip_blocks(decoder) {
    decoder.skip_string
    skip_data(writers_schema.values, decoder)
  }
end

#skip_record(writers_schema, decoder) ⇒ Object



514
515
516
# File 'lib/avro/io.rb', line 514

def skip_record(writers_schema, decoder)
  writers_schema.fields.each{|f| skip_data(f.type, decoder) }
end

#skip_union(writers_schema, decoder) ⇒ Object



498
499
500
501
# File 'lib/avro/io.rb', line 498

def skip_union(writers_schema, decoder)
  index = decoder.read_long
  skip_data(writers_schema.schemas[index], decoder)
end