Module: AthenaUDF::Utils
- Included in:
- BaseUDF
- Defined in:
- lib/athena-udf/utils.rb
Constant Summary collapse
- SEPARATOR =
"\xFF\xFF\xFF\xFF".b
- SEPARATOR_SIZE =
SEPARATOR.bytesize
Instance Method Summary collapse
- #get_record_batch_bytes(schema, record_batch) ⇒ Object
- #get_schema_bytes(schema) ⇒ Object
- #read_record_batches(schema_data, record_batch_data) ⇒ Object
- #read_schema(schema_data) ⇒ Object
Instance Method Details
#get_record_batch_bytes(schema, record_batch) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/athena-udf/utils.rb', line 49 def get_record_batch_bytes(schema, record_batch) buffer = Arrow::ResizableBuffer.new(0) Arrow::BufferOutputStream.open(buffer) do |output| Arrow::RecordBatchStreamWriter.open(output, schema) do |writer| writer.write_record_batch(record_batch) end bytes = buffer.data.to_s start_index = bytes.index(SEPARATOR, SEPARATOR_SIZE) + SEPARATOR_SIZE bytes[start_index..] end end |
#get_schema_bytes(schema) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/athena-udf/utils.rb', line 36 def get_schema_bytes(schema) buffer = Arrow::ResizableBuffer.new(0) Arrow::BufferOutputStream.open(buffer) do |output| Arrow::RecordBatchStreamWriter.open(output, schema) do |writer| # noop end bytes = buffer.data.to_s last_index = bytes.index(SEPARATOR, SEPARATOR_SIZE) bytes[SEPARATOR_SIZE...last_index] end end |
#read_record_batches(schema_data, record_batch_data) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/athena-udf/utils.rb', line 8 def read_record_batches(schema_data, record_batch_data) buffer = Arrow::ResizableBuffer.new(schema_data.bytes.size + record_batch_data.bytes.size) Arrow::BufferOutputStream.open(buffer) do |output| output.write(schema_data) output.write(record_batch_data) Arrow::BufferInputStream.open(buffer) do |input| reader = Arrow::RecordBatchStreamReader.new(input) input_schema = reader.schema reader.each do |record_batch| yield input_schema, record_batch end end end end |
#read_schema(schema_data) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/athena-udf/utils.rb', line 24 def read_schema(schema_data) buffer = Arrow::ResizableBuffer.new(schema_data.bytes.size) Arrow::BufferOutputStream.open(buffer) do |output| output.write(schema_data) Arrow::BufferInputStream.open(buffer) do |input| reader = Arrow::RecordBatchStreamReader.new(input) reader.schema end end end |