Class: Embulk::Output::Bigquery::FileWriter
- Inherits:
-
Object
- Object
- Embulk::Output::Bigquery::FileWriter
- Defined in:
- lib/embulk/output/bigquery/file_writer.rb
Instance Attribute Summary collapse
-
#num_rows ⇒ Object
readonly
Returns the value of attribute num_rows.
Instance Method Summary collapse
- #add(page) ⇒ Object
- #close ⇒ Object
-
#initialize(task, schema, index, converters = nil) ⇒ FileWriter
constructor
A new instance of FileWriter.
- #io ⇒ Object
- #num_format(number) ⇒ Object
- #open(path, mode = 'w') ⇒ Object
- #reopen ⇒ Object
- #to_csv(record) ⇒ Object
- #to_jsonl(record) ⇒ Object
- #to_payload(record) ⇒ Object
Constructor Details
#initialize(task, schema, index, converters = nil) ⇒ FileWriter
Returns a new instance of FileWriter.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 12 def initialize(task, schema, index, converters = nil) @task = task @schema = schema @index = index @converters = converters || ValueConverterFactory.create_converters(task, schema) @num_rows = 0 @progress_log_timer = Time.now @previous_num_rows = 0 if @task['payload_column_index'] @payload_column_index = @task['payload_column_index'] @formatter_proc = self.method(:to_payload) else case @task['source_format'].downcase when 'csv' @formatter_proc = self.method(:to_csv) else @formatter_proc = self.method(:to_jsonl) end end end |
Instance Attribute Details
#num_rows ⇒ Object (readonly)
Returns the value of attribute num_rows.
10 11 12 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 10 def num_rows @num_rows end |
Instance Method Details
#add(page) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 95 def add(page) _io = io # I once tried to split IO writing into another IO thread using SizedQueue # However, it resulted in worse performance, so I removed the codes. page.each do |record| Embulk.logger.trace { "embulk-output-bigquery: record #{record}" } formatted_record = @formatter_proc.call(record) Embulk.logger.trace { "embulk-output-bigquery: formatted_record #{formatted_record.chomp}" } _io.write formatted_record @num_rows += 1 end now = Time.now if @progress_log_timer < now - 10 # once in 10 seconds speed = ((@num_rows - @previous_num_rows) / (now - @progress_log_timer).to_f).round(1) @progress_log_timer = now @previous_num_rows = @num_rows Embulk.logger.info { "embulk-output-bigquery: num_rows #{num_format(@num_rows)} (#{num_format(speed)} rows/sec)" } end @num_rows end |
#close ⇒ Object
62 63 64 65 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 62 def close io.close rescue nil io end |
#io ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 35 def io return @io if @io path = sprintf( "#{@task['path_prefix']}#{@task['sequence_format']}#{@task['file_ext']}", Process.pid, Thread.current.object_id ) if File.exist?(path) Embulk.logger.warn { "embulk-output-bigquery: unlink already existing #{path}" } File.unlink(path) rescue nil end Embulk.logger.info { "embulk-output-bigquery: create #{path}" } @io = open(path, 'w') end |
#num_format(number) ⇒ Object
91 92 93 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 91 def num_format(number) number.to_s.gsub(/(\d)(?=(\d{3})+(?!\d))/, '\1,') end |
#open(path, mode = 'w') ⇒ Object
51 52 53 54 55 56 57 58 59 60 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 51 def open(path, mode = 'w') file_io = File.open(path, mode) case @task['compression'].downcase when 'gzip' io = Zlib::GzipWriter.new(file_io) else io = file_io end io end |
#reopen ⇒ Object
67 68 69 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 67 def reopen @io = open(io.path, 'a') end |
#to_csv(record) ⇒ Object
75 76 77 78 79 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 75 def to_csv(record) record.map.with_index do |value, column_index| @converters[column_index].call(value) end.to_csv end |
#to_jsonl(record) ⇒ Object
81 82 83 84 85 86 87 88 89 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 81 def to_jsonl(record) hash = {} column_names = @schema.names record.each_with_index do |value, column_index| column_name = column_names[column_index] hash[column_name] = @converters[column_index].call(value) end "#{hash.to_json}\n" end |
#to_payload(record) ⇒ Object
71 72 73 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 71 def to_payload(record) "#{record[@payload_column_index]}\n" end |