Class: Embulk::Schema

Inherits:
Array
  • Object
show all
Defined in:
lib/embulk/schema.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(columns) ⇒ Schema

Returns a new instance of Schema.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/embulk/schema.rb', line 7

def initialize(columns)
  columns = columns.map.with_index {|c,index|
    if c.index && c.index != index
      # TODO ignore this error?
      raise "Index of column '#{c.name}' is #{c.index} but it is at column #{index}."
    end
    Column.new(index, c.name, c.type, c.format)
  }
  super(columns)

  record_reader_script =
    "lambda do |reader|\n" <<
    "record = []\n"
  each do |column|
    idx = column.index
    column_script =
      "if reader.isNull(#{idx})\n" <<
      "record << nil\n" <<
      "else\n" <<
      case column.type
      when :boolean
        "record << reader.getBoolean(#{idx})"
      when :long
        "record << reader.getLong(#{idx})"
      when :double
        "record << reader.getDouble(#{idx})"
      when :string
        "record << reader.getString(#{idx})"
      when :timestamp
        "record << (java_timestamp = reader.getTimestamp(#{idx}); ruby_time = Java::org.jruby.RubyTime.new(JRuby.runtime, JRuby.runtime.getClass('Time'), Java::org.joda.time.DateTime.new(java_timestamp.toEpochMilli())).gmtime().to_java(Java::org.jruby.RubyTime); ruby_time.setNSec(java_timestamp.getNano()); ruby_time)"
      when :json
        "record << MessagePack.unpack(String.from_java_bytes((::Java::org.msgpack.core.MessagePack.newDefaultBufferPacker()).packValue(reader.getJson(#{idx})).toMessageBuffer().toByteArray()))"
      else
        raise "Unknown type #{column.type.inspect}"
      end <<
      "end\n"
    record_reader_script << column_script << "\n"
  end
  record_reader_script << "record\n"
  record_reader_script << "end"
  @record_reader = eval(record_reader_script)

  record_writer_script = "lambda do |builder,record|\n"
  record_writer_script << "java_timestamp_class = ::Embulk::Java::Timestamp\n"
  each do |column|
    idx = column.index
    column_script =
      "if record[#{idx}].nil?\n" <<
      "builder.setNull(#{idx})\n" <<
      "else\n" <<
      case column.type
      when :boolean
        "builder.setBoolean(#{idx}, record[#{idx}])"
      when :long
        "builder.setLong(#{idx}, record[#{idx}])"
      when :double
        "builder.setDouble(#{idx}, record[#{idx}])"
      when :string
        "builder.setString(#{idx}, record[#{idx}])"
      when :timestamp
        # It was originally expecting that `record[#{idx}]` was a Ruby Time object. Does it really happen?
        "(ruby_time = record[#{idx}].to_java(Java::org.jruby.RubyTime); msec = ruby_time.getDateTime().getMillis(); builder.setTimestamp(#{idx}, java_timestamp_class.ofEpochSecond(msec / 1000, ruby_time.getNSec() + (msec % 1000) * 1000000)))"
      when :json
        "builder.setJson(#{idx}, ::Java::org.msgpack.core.MessagePack.newDefaultUnpacker(MessagePack.pack(record[#{idx}]).to_java_bytes).unpackValue())"
      else
        raise "Unknown type #{column.type.inspect}"
      end <<
      "end\n"
    record_writer_script << column_script << "\n"
  end
  record_writer_script << "builder.addRecord\n"
  record_writer_script << "end"
  @record_writer = eval(record_writer_script)

  @names = map {|c| c.name }
  @types = map {|c| c.type }

  freeze
end

Instance Attribute Details

#namesObject (readonly)

Returns the value of attribute names.



87
88
89
# File 'lib/embulk/schema.rb', line 87

def names
  @names
end

#typesObject (readonly)

Returns the value of attribute types.



87
88
89
# File 'lib/embulk/schema.rb', line 87

def types
  @types
end

Class Method Details

.from_java(java_schema) ⇒ Object



97
98
99
# File 'lib/embulk/schema.rb', line 97

def self.from_java(java_schema)
  new java_schema.getColumns.map {|column| Column.from_java(column) }
end

Instance Method Details

#read_record(page_reader) ⇒ Object



89
90
91
# File 'lib/embulk/schema.rb', line 89

def read_record(page_reader)
  @record_reader.call(page_reader)
end

#to_javaObject



101
102
103
104
# File 'lib/embulk/schema.rb', line 101

def to_java
  columns = self.map {|column| column.to_java }
  Java::Schema.new(columns)
end

#write_record(page_builder, record) ⇒ Object



93
94
95
# File 'lib/embulk/schema.rb', line 93

def write_record(page_builder, record)
  @record_writer.call(page_builder, record)
end