Class: CSV2Avro::Converter

Inherits:
Object
  • Object
show all
Defined in:
lib/csv2avro/converter.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(reader, writer, bad_rows_writer, filename, options, schema: schema) ⇒ Converter

Returns a new instance of Converter.



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/csv2avro/converter.rb', line 17

def initialize(reader, writer, bad_rows_writer, filename, options, schema: schema)
  @reader = reader
  @writer = writer
  @bad_rows_writer = bad_rows_writer
  @filename = filename
  @options = options
  @schema = schema

  # read header row explicitly
  @header = @reader.readline.strip.split(col_sep).map{ |col| col.gsub('"','') }
end

Class Method Details

.loggerObject



9
10
11
# File 'lib/csv2avro/converter.rb', line 9

def self.logger
  @logger ||= Logr::Logger.new('csv2avro.converter')
end

Instance Method Details

#convertObject



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/csv2avro/converter.rb', line 29

def convert
  while not csv.eof? do
    begin
      row = csv.shift
    rescue CSV::MalformedCSVError
      error_msg = "L#{row_number}: Unable to parse"
      logger.event('parse_error', filename: @filename, line: row_number)
            .error(error_msg)
      @bad_rows_writer.puts(error_msg)
      next
    end
    hash = row.to_hash

    add_defaults_to_hash!(hash) if @options[:write_defaults]
    convert_fields!(hash)

    begin
      @writer.write(hash)
    rescue CSV2Avro::SchemaValidationError => e
      error_msg = "L#{row_number}: #{e.errors.join(', ')}"
      e.errors.each do |error|
        logger.event('schema_violation', filename: @filename, line: row_number, cause: error)
              .error(error_msg)
      end
      @bad_rows_writer.puts(error_msg)
    end
  end
  @writer.flush
  row_number
end

#loggerObject



13
14
15
# File 'lib/csv2avro/converter.rb', line 13

def logger
  self.class.logger
end