Class: Embulk::Schema
- Inherits:
-
Array
- Object
- Array
- Embulk::Schema
- Defined in:
- lib/embulk/schema.rb
Instance Attribute Summary collapse
-
#names ⇒ Object
readonly
Returns the value of attribute names.
-
#types ⇒ Object
readonly
Returns the value of attribute types.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(columns) ⇒ Schema
constructor
A new instance of Schema.
- #read_record(page_reader) ⇒ Object
- #to_java ⇒ Object
- #write_record(page_builder, record) ⇒ Object
Constructor Details
#initialize(columns) ⇒ Schema
Returns a new instance of Schema.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/embulk/schema.rb', line 7 def initialize(columns) columns = columns.map.with_index {|c,index| if c.index && c.index != index # TODO ignore this error? raise "Index of column '#{c.name}' is #{c.index} but it is at column #{index}." end Column.new(index, c.name, c.type, c.format) } super(columns) record_reader_script = "lambda do |reader|\n" << "record = []\n" each do |column| idx = column.index column_script = "if reader.isNull(#{idx})\n" << "record << nil\n" << "else\n" << case column.type when :boolean "record << reader.getBoolean(#{idx})" when :long "record << reader.getLong(#{idx})" when :double "record << reader.getDouble(#{idx})" when :string "record << reader.getString(#{idx})" when :timestamp "record << (java_timestamp = reader.getTimestamp(#{idx}); ruby_time = Java::org.jruby.RubyTime.new(JRuby.runtime, JRuby.runtime.getClass('Time'), Java::org.joda.time.DateTime.new(java_timestamp.toEpochMilli())).gmtime().to_java(Java::org.jruby.RubyTime); ruby_time.setNSec(java_timestamp.getNano()); ruby_time)" when :json "record << MessagePack.unpack(String.from_java_bytes((::Java::org.msgpack.core.MessagePack.newDefaultBufferPacker()).packValue(reader.getJson(#{idx})).toMessageBuffer().toByteArray()))" else raise "Unknown type #{column.type.inspect}" end << "end\n" record_reader_script << column_script << "\n" end record_reader_script << "record\n" record_reader_script << "end" @record_reader = eval(record_reader_script) record_writer_script = "lambda do |builder,record|\n" record_writer_script << "java_timestamp_class = ::Embulk::Java::Timestamp\n" each do |column| idx = column.index column_script = "if record[#{idx}].nil?\n" << "builder.setNull(#{idx})\n" << "else\n" << case column.type when :boolean "builder.setBoolean(#{idx}, record[#{idx}])" when :long "builder.setLong(#{idx}, record[#{idx}])" when :double "builder.setDouble(#{idx}, record[#{idx}])" when :string "builder.setString(#{idx}, record[#{idx}])" when :timestamp # It was originally expecting that `record[#{idx}]` was a Ruby Time object. Does it really happen? "(ruby_time = record[#{idx}].to_java(Java::org.jruby.RubyTime); msec = ruby_time.getDateTime().getMillis(); builder.setTimestamp(#{idx}, java_timestamp_class.ofEpochSecond(msec / 1000, ruby_time.getNSec() + (msec % 1000) * 1000000)))" when :json "builder.setJson(#{idx}, ::Java::org.msgpack.core.MessagePack.newDefaultUnpacker(MessagePack.pack(record[#{idx}]).to_java_bytes).unpackValue())" else raise "Unknown type #{column.type.inspect}" end << "end\n" record_writer_script << column_script << "\n" end record_writer_script << "builder.addRecord\n" record_writer_script << "end" @record_writer = eval(record_writer_script) @names = map {|c| c.name } @types = map {|c| c.type } freeze end |
Instance Attribute Details
#names ⇒ Object (readonly)
Returns the value of attribute names.
87 88 89 |
# File 'lib/embulk/schema.rb', line 87 def names @names end |
#types ⇒ Object (readonly)
Returns the value of attribute types.
87 88 89 |
# File 'lib/embulk/schema.rb', line 87 def types @types end |
Class Method Details
Instance Method Details
#read_record(page_reader) ⇒ Object
89 90 91 |
# File 'lib/embulk/schema.rb', line 89 def read_record(page_reader) @record_reader.call(page_reader) end |
#to_java ⇒ Object
101 102 103 104 |
# File 'lib/embulk/schema.rb', line 101 def to_java columns = self.map {|column| column.to_java } Java::Schema.new(columns) end |
#write_record(page_builder, record) ⇒ Object
93 94 95 |
# File 'lib/embulk/schema.rb', line 93 def write_record(page_builder, record) @record_writer.call(page_builder, record) end |