Class: Embulk::Output::Bigquery::FileWriter

Inherits:
Object
  • Object
show all
Defined in:
lib/embulk/output/bigquery/file_writer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task, schema, index, converters = nil) ⇒ FileWriter

Returns a new instance of FileWriter.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/embulk/output/bigquery/file_writer.rb', line 12

def initialize(task, schema, index, converters = nil)
  @task = task
  @schema = schema
  @index = index
  @converters = converters || ValueConverterFactory.create_converters(task, schema)

  @num_rows = 0
  if @task['progress_log_interval']
    @progress_log_interval = @task['progress_log_interval']
    @progress_log_timer = Time.now
    @previous_num_rows = 0
  end

  if @task['payload_column_index']
    @payload_column_index = @task['payload_column_index']
    @formatter_proc = self.method(:to_payload)
  else
    case @task['source_format'].downcase
    when 'csv'
      @formatter_proc = self.method(:to_csv)
    else
      @formatter_proc = self.method(:to_jsonl)
    end
  end
end

Instance Attribute Details

#num_rowsObject (readonly)

Returns the value of attribute num_rows.



10
11
12
# File 'lib/embulk/output/bigquery/file_writer.rb', line 10

def num_rows
  @num_rows
end

Instance Method Details

#add(page) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/embulk/output/bigquery/file_writer.rb', line 98

def add(page)
  _io = io
  # I once tried to split IO writing into another IO thread using SizedQueue
  # However, it resulted in worse performance, so I removed the codes.
  page.each do |record|
    Embulk.logger.trace { "embulk-output-bigquery: record #{record}" }
    formatted_record = @formatter_proc.call(record)
    Embulk.logger.trace { "embulk-output-bigquery: formatted_record #{formatted_record.chomp}" }
    _io.write formatted_record
    @num_rows += 1
  end
  show_progress if @task['progress_log_interval']
  @num_rows
end

#closeObject



65
66
67
68
# File 'lib/embulk/output/bigquery/file_writer.rb', line 65

def close
  io.close rescue nil
  io
end

#ioObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/embulk/output/bigquery/file_writer.rb', line 38

def io
  return @io if @io

  path = sprintf(
    "#{@task['path_prefix']}#{@task['sequence_format']}#{@task['file_ext']}",
    Process.pid, Thread.current.object_id
  )
  if File.exist?(path)
    Embulk.logger.warn { "embulk-output-bigquery: unlink already existing #{path}" }
    File.unlink(path) rescue nil
  end
  Embulk.logger.info { "embulk-output-bigquery: create #{path}" }

  @io = open(path, 'w')
end

#num_format(number) ⇒ Object



94
95
96
# File 'lib/embulk/output/bigquery/file_writer.rb', line 94

def num_format(number)
  number.to_s.gsub(/(\d)(?=(\d{3})+(?!\d))/, '\1,')
end

#open(path, mode = 'w') ⇒ Object



54
55
56
57
58
59
60
61
62
63
# File 'lib/embulk/output/bigquery/file_writer.rb', line 54

def open(path, mode = 'w')
  file_io = File.open(path, mode)
  case @task['compression'].downcase
  when 'gzip'
    io = Zlib::GzipWriter.new(file_io)
  else
    io = file_io
  end
  io
end

#reopenObject



70
71
72
# File 'lib/embulk/output/bigquery/file_writer.rb', line 70

def reopen
  @io = open(io.path, 'a')
end

#to_csv(record) ⇒ Object



78
79
80
81
82
# File 'lib/embulk/output/bigquery/file_writer.rb', line 78

def to_csv(record)
  record.map.with_index do |value, column_index|
    @converters[column_index].call(value)
  end.to_csv
end

#to_jsonl(record) ⇒ Object



84
85
86
87
88
89
90
91
92
# File 'lib/embulk/output/bigquery/file_writer.rb', line 84

def to_jsonl(record)
  hash = {}
  column_names = @schema.names
  record.each_with_index do |value, column_index|
    column_name = column_names[column_index]
    hash[column_name] = @converters[column_index].call(value)
  end
  "#{hash.to_json}\n"
end

#to_payload(record) ⇒ Object



74
75
76
# File 'lib/embulk/output/bigquery/file_writer.rb', line 74

def to_payload(record)
  "#{record[@payload_column_index]}\n"
end