Class: Embulk::Guess::CsvGuessPlugin

Inherits:
LineGuessPlugin
  • Object
show all
Defined in:
lib/embulk/guess/csv_verify.rb

Constant Summary collapse

CLASSLOADER =
create_classloader
CONFIG_MAPPER_FACTORY_CLASS =
CLASSLOADER.loadClass("org.embulk.util.config.ConfigMapperFactory").ruby_class
CONFIG_MAPPER_FACTORY =
CONFIG_MAPPER_FACTORY_CLASS.builder.addDefaultModules.build
LEGACY_PLUGIN_TASK_CLASS =
CLASSLOADER.loadClass("org.embulk.standards.CsvParserPlugin$PluginTask").ruby_class
LIST_FILE_INPUT_CLASS =
CLASSLOADER.loadClass("org.embulk.util.file.ListFileInput").ruby_class
LINE_DECODER_CLASS =
CLASSLOADER.loadClass("org.embulk.util.text.LineDecoder").ruby_class
CSV_GUESS_PLUGIN_CLASS =
CLASSLOADER.loadClass("org.embulk.guess.csv.CsvGuessPlugin").ruby_class
LEGACY_CSV_TOKENIZER_CLASS =
CLASSLOADER.loadClass("org.embulk.standards.CsvTokenizer").ruby_class
LEGACY_TOO_FEW_COLUMNS_EXCEPTION_CLASS =
CLASSLOADER.loadClass("org.embulk.standards.CsvTokenizer$TooFewColumnsException").ruby_class
LEGACY_INVALID_VALUE_EXCEPTION_CLASS =
CLASSLOADER.loadClass("org.embulk.standards.CsvTokenizer$InvalidValueException").ruby_class
DELIMITER_CANDIDATES =
[
  ",", "\t", "|", ";"
]
QUOTE_CANDIDATES =
[
  "\"", "'"
]
ESCAPE_CANDIDATES =
[
  "\\", '"'
]
NULL_STRING_CANDIDATES =
[
  "null",
  "NULL",
  "#N/A",
  "\\N",  # MySQL LOAD, Hive STORED AS TEXTFILE
]
COMMENT_LINE_MARKER_CANDIDATES =
[
  "#",
  "//",
]
MAX_SKIP_LINES =
10
NO_SKIP_DETECT_LINES =
10

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.create_classloaderObject



9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/embulk/guess/csv_verify.rb', line 9

def self.create_classloader
  jars = Dir["#{File.expand_path('../../../../classpath', __FILE__)}/**/*.jar"]
  urls = jars.map {|jar| java.io.File.new(File.expand_path(jar)).toURI.toURL }
  begin
    expected_temporary_variable_name = Java::org.embulk.jruby.JRubyPluginSource::PLUGIN_CLASS_LOADER_FACTORY_VARIABLE_NAME
  rescue => e
    raise PluginLoadError.new "Java's org.embulk.jruby.JRubyPluginSource does not define PLUGIN_CLASS_LOADER_FACTORY_VARIABLE_NAME unexpectedly."
  end
  if expected_temporary_variable_name != "$temporary_internal_plugin_class_loader_factory__"
    raise PluginLoadError.new "Java's org.embulk.jruby.JRubyPluginSource does not define PLUGIN_CLASS_LOADER_FACTORY_VARIABLE_NAME correctly."
  end
  factory = $temporary_internal_plugin_class_loader_factory__
  factory.create(urls, JRuby.runtime.getJRubyClassLoader())
end

Instance Method Details

#guess_lines(config, sample_lines) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/embulk/guess/csv_verify.rb', line 62

def guess_lines(config, sample_lines)
  guessed_ruby = guess_lines_iter(config, sample_lines)

  begin
    guess_plugin_java = CSV_GUESS_PLUGIN_CLASS.new
    guessed_java = guess_plugin_java.guess_lines(config_to_java(config), config_to_java(sample_lines))
    if guessed_java.nil?
      raise "embulk-guess-csv (Java) returned null."
    end
    guessed_ruby_converted = config_to_java(guessed_ruby)
    if !guessed_java.equals(guessed_ruby_converted)
      log_guess_diff(guessed_ruby, guessed_java, "decoders")
      log_guess_diff(guessed_ruby, guessed_java, "parser")
      raise "embulk-guess-csv has difference between Java/Ruby."
    end
  rescue Exception => e
    # Any error from the Java-based guess plugin should pass-through just with logging.
    Embulk.logger.error "[Embulk CSV guess verify] #{e.inspect}"
  end

  # This plugin returns a result from the Ruby-based implementation.
  return guessed_ruby
end

#guess_lines_iter(config, sample_lines) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/embulk/guess/csv_verify.rb', line 86

def guess_lines_iter(config, sample_lines)
  return {} unless config.fetch("parser", {}).fetch("type", "csv") == "csv"

  parser_config = config["parser"] || {}
  if parser_config["type"] == "csv" && parser_config["delimiter"]
    delim = parser_config["delimiter"]
  else
    delim = guess_delimiter(sample_lines)
    unless delim
      # assuming single column CSV
      delim = DELIMITER_CANDIDATES.first
    end
  end

  parser_guessed = DataSource.new.merge(parser_config).merge({"type" => "csv", "delimiter" => delim})

  unless parser_guessed.has_key?("quote")
    quote = guess_quote(sample_lines, delim)
    unless quote
      if !guess_force_no_quote(sample_lines, delim, '"')
        # assuming CSV follows RFC for quoting
        quote = '"'
      else
        # disable quoting (set null)
      end
    end
    parser_guessed["quote"] = quote
  end
  parser_guessed["quote"] = '"' if parser_guessed["quote"] == ''  # setting '' is not allowed any more. this line converts obsoleted config syntax to explicit syntax.

  unless parser_guessed.has_key?("escape")
    if quote = parser_guessed["quote"]
      escape = guess_escape(sample_lines, delim, quote)
      unless escape
        if quote == '"'
          # assuming this CSV follows RFC for escaping
          escape = '"'
        else
          # disable escaping (set null)
        end
      end
      parser_guessed["escape"] = escape
    else
      # escape does nothing if quote is disabled
    end
  end

  unless parser_guessed.has_key?("null_string")
    null_string = guess_null_string(sample_lines, delim)
    parser_guessed["null_string"] = null_string if null_string
    # don't even set null_string to avoid confusion of null and 'null' in YAML format
  end

  # guessing skip_header_lines should be before guessing guess_comment_line_marker
  # because lines supplied to CsvTokenizer already don't include skipped header lines.
  # skipping empty lines is also disabled here because skipping header lines is done by
  # CsvParser which doesn't skip empty lines automatically
  sample_records = split_lines(parser_guessed, false, sample_lines, delim, {})
  skip_header_lines = guess_skip_header_lines(sample_records)
  sample_lines = sample_lines[skip_header_lines..-1]
  sample_records = sample_records[skip_header_lines..-1]

  unless parser_guessed.has_key?("comment_line_marker")
    comment_line_marker, sample_lines =
      guess_comment_line_marker(sample_lines, delim, parser_guessed["quote"], parser_guessed["null_string"])
    if comment_line_marker
      parser_guessed["comment_line_marker"] = comment_line_marker
    end
  end

  sample_records = split_lines(parser_guessed, true, sample_lines, delim, {})

  # It should fail if CSV parser cannot parse sample_lines.
  if sample_records.nil? || sample_records.empty?
    return {}
  end

  if sample_lines.size == 1
    # The file contains only 1 line. Assume that there are no header line.
    header_line = false

    column_types = SchemaGuess.types_from_array_records(sample_records[0, 1])

    unless parser_guessed.has_key?("trim_if_not_quoted")
      sample_records_trimmed = split_lines(parser_guessed, true, sample_lines, delim, {"trim_if_not_quoted" => true})
      column_types_trimmed = SchemaGuess.types_from_array_records(sample_records_trimmed)
      if column_types != column_types_trimmed
        parser_guessed["trim_if_not_quoted"] = true
        column_types = column_types_trimmed
      else
        parser_guessed["trim_if_not_quoted"] = false
      end
    end
  else
    # The file contains more than 1 line. If guessed first line's column types are all strings or boolean, and the types are
    # different from the other lines, assume that the first line is column names.
    first_types = SchemaGuess.types_from_array_records(sample_records[0, 1])
    other_types = SchemaGuess.types_from_array_records(sample_records[1..-1] || [])

    unless parser_guessed.has_key?("trim_if_not_quoted")
      sample_records_trimmed = split_lines(parser_guessed, true, sample_lines, delim, {"trim_if_not_quoted" => true})
      other_types_trimmed = SchemaGuess.types_from_array_records(sample_records_trimmed[1..-1] || [])
      if other_types != other_types_trimmed
        parser_guessed["trim_if_not_quoted"] = true
        other_types = other_types_trimmed
      else
        parser_guessed["trim_if_not_quoted"] = false
      end
    end

    header_line = (first_types != other_types && first_types.all? {|t| ["string", "boolean"].include?(t) }) || guess_string_header_line(sample_records)
    column_types = other_types
  end

  if column_types.empty?
    # TODO here is making the guessing failed if the file doesn't contain any columns. However,
    #      this may not be convenient for users.
    return {}
  end

  if header_line
    parser_guessed["skip_header_lines"] = skip_header_lines + 1
  else
    parser_guessed["skip_header_lines"] = skip_header_lines
  end

  parser_guessed["allow_extra_columns"] = false unless parser_guessed.has_key?("allow_extra_columns")
  parser_guessed["allow_optional_columns"] = false unless parser_guessed.has_key?("allow_optional_columns")

  if header_line
    column_names = sample_records.first.map(&:strip)
  else
    column_names = (0..column_types.size).to_a.map {|i| "c#{i}" }
  end
  schema = []
  column_names.zip(column_types).each do |name,type|
    if name && type
      schema << new_column(name, type)
    end
  end
  parser_guessed["columns"] = schema

  return {"parser" => parser_guessed}
end

#new_column(name, type) ⇒ Object



231
232
233
234
235
236
237
# File 'lib/embulk/guess/csv_verify.rb', line 231

def new_column(name, type)
  if type.is_a?(SchemaGuess::TimestampTypeMatch)
    {"name" => name, "type" => type, "format" => type.format}
  else
    {"name" => name, "type" => type}
  end
end