Class: Embulk::Output::Vertica::OutputThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/embulk/output/vertica/output_thread.rb

Instance Method Summary collapse

Constructor Details

#initialize(task, schema, size) ⇒ OutputThreadPool

Returns a new instance of OutputThreadPool.



16
17
18
19
20
21
22
23
# File 'lib/embulk/output/vertica/output_thread.rb', line 16

def initialize(task, schema, size)
  @task = task
  @size = size
  @schema = schema
  @converters = ValueConverterFactory.create_converters(schema, task['default_timezone'], task['column_options'])
  @output_threads = size.times.map { OutputThread.new(task) }
  @current_index = 0
end

Instance Method Details

#commitObject



41
42
43
44
45
46
47
48
49
50
# File 'lib/embulk/output/vertica/output_thread.rb', line 41

def commit
  Embulk.logger.debug "embulk-output-vertica: commit"
  task_reports = @mutex.synchronize do
    @size.times.map {|i| @output_threads[i].commit }
  end
  unless task_reports.all? {|task_report| task_report['success'] }
    raise CommitError, "some of output_threads failed to commit"
  end
  task_reports
end

#enqueue(page) ⇒ Object



25
26
27
28
29
30
31
32
33
34
# File 'lib/embulk/output/vertica/output_thread.rb', line 25

def enqueue(page)
  json_page = []
  page.each do |record|
    json_page << to_json(record)
  end
  @mutex.synchronize do
    @output_threads[@current_index].enqueue(json_page)
    @current_index = (@current_index + 1) % @size
  end
end

#startObject



36
37
38
39
# File 'lib/embulk/output/vertica/output_thread.rb', line 36

def start
  @mutex = Mutex.new
  @size.times.map {|i| @output_threads[i].start }
end

#to_json(record) ⇒ Object



52
53
54
55
56
57
58
59
60
# File 'lib/embulk/output/vertica/output_thread.rb', line 52

def to_json(record)
  if @task['json_payload']
    record.first
  else
    Hash[*(@schema.names.zip(record).map do |column_name, value|
      [column_name, @converters[column_name].call(value)]
    end.flatten!(1))].to_json
  end
end