Class: Embulk::Guess::Msgpack

Inherits:
GuessPlugin
  • Object
show all
Defined in:
lib/embulk/guess/msgpack.rb

Instance Method Summary collapse

Instance Method Details

#guess(config, sample_buffer) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/embulk/guess/msgpack.rb', line 7

def guess(config, sample_buffer)
  return {} unless config.fetch("parser", {}).fetch("type", "msgpack") == "msgpack"

  parser_config = config["parser"] || {}

  classpath = File.expand_path('../../../../classpath', __FILE__)
  Dir["#{classpath}/*.jar"].each {|jar| require jar }

  file_encoding = parser_config["file_encoding"]
  row_encoding = parser_config["row_encoding"]

  if !file_encoding || !row_encoding
    uk = new_unpacker(sample_buffer)
    begin
      n = uk.unpackArrayHeader
      begin
        n = uk.unpackArrayHeader
        file_encoding = "array"
        row_encoding = "array"
      rescue org.msgpack.core.MessageTypeException
        file_encoding = "sequence"
        row_encoding = "array"
      end
    rescue org.msgpack.core.MessageTypeException
      uk = new_unpacker(sample_buffer)  # TODO unpackArrayHeader consumes buffer (unexpectedly)
      begin
        n = uk.unpackMapHeader
        file_encoding = "sequence"
        row_encoding = "map"
      rescue org.msgpack.core.MessageTypeException
        return {}  # not a msgpack
      end
    end
  end

  uk = new_unpacker(sample_buffer)

  case file_encoding
  when "array"
    uk.unpackArrayHeader  # skip array header to convert to sequence
  when "sequence"
    # do nothing
  end

  rows = []

  begin
    while true
      rows << JSON.parse(uk.unpackValue.toJson)
    end
  rescue java.io.EOFException
  end

  if rows.size <= 3
    return {}
  end

  case row_encoding
  when "map"
    schema = Embulk::Guess::SchemaGuess.from_hash_records(rows)
  when "array"
    column_count = rows.map {|r| r.size }.max
    column_names = column_count.times.map {|i| "c#{i}" }
    schema = Embulk::Guess::SchemaGuess.from_array_records(column_names, rows)
  end

  parser_guessed = {"type" => "msgpack"}
  parser_guessed["row_encoding"] = row_encoding
  parser_guessed["file_encoding"] = file_encoding
  parser_guessed["columns"] = schema

  return {"parser" => parser_guessed}

rescue org.msgpack.core.MessagePackException
  return {}
end

#new_unpacker(sample_buffer) ⇒ Object



84
85
86
# File 'lib/embulk/guess/msgpack.rb', line 84

def new_unpacker(sample_buffer)
  org.msgpack.core.MessagePack.newDefaultUnpacker(sample_buffer.to_java_bytes)
end