Class: Embulk::Output::Vertica::OutputThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/embulk/output/vertica/output_thread.rb

Instance Method Summary collapse

Constructor Details

#initialize(task, schema, size) ⇒ OutputThreadPool

Returns a new instance of OutputThreadPool.



7
8
9
10
11
12
13
14
# File 'lib/embulk/output/vertica/output_thread.rb', line 7

def initialize(task, schema, size)
  @task = task
  @size = size
  @schema = schema
  @converters = ValueConverterFactory.create_converters(schema, task['default_timezone'], task['column_options'])
  @output_threads = size.times.map { OutputThread.new(task) }
  @current_index = 0
end

Instance Method Details

#commitObject



32
33
34
# File 'lib/embulk/output/vertica/output_thread.rb', line 32

def commit
  task_reports = @size.times.map {|i| @output_threads[i].commit }
end

#enqueue(page) ⇒ Object



16
17
18
19
20
21
22
23
24
25
# File 'lib/embulk/output/vertica/output_thread.rb', line 16

def enqueue(page)
  json_page = []
  page.each do |record|
    json_page << to_json(record)
  end
  @mutex.synchronize do
    @output_threads[@current_index].enqueue(json_page)
    @current_index = (@current_index + 1) % @size
  end
end

#startObject



27
28
29
30
# File 'lib/embulk/output/vertica/output_thread.rb', line 27

def start
  @mutex = Mutex.new
  @size.times.map {|i| @output_threads[i].start }
end

#to_json(record) ⇒ Object



36
37
38
39
40
41
42
43
44
# File 'lib/embulk/output/vertica/output_thread.rb', line 36

def to_json(record)
  if @task['json_payload']
    record.first
  else
    Hash[*(@schema.names.zip(record).map do |column_name, value|
      [column_name, @converters[column_name].call(value)]
    end.flatten!(1))].to_json
  end
end