Class: Embulk::Output::Vertica::OutputThread

Inherits:
Object
  • Object
show all
Defined in:
lib/embulk/output/vertica/output_thread.rb

Constant Summary collapse

PIPE_BUF =
4096

Instance Method Summary collapse

Constructor Details

#initialize(task) ⇒ OutputThread

Returns a new instance of OutputThread.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/embulk/output/vertica/output_thread.rb', line 64

def initialize(task)
  @task = task
  @queue = SizedQueue.new(1)
  @num_input_rows = 0
  @num_output_rows = 0
  @num_rejected_rows = 0
  @outer_thread = Thread.current
  @thread_active = false
  @progress_log_timer = Time.now
  @previous_num_input_rows = 0

  case task['compress']
  when 'GZIP'
    @write_proc = self.method(:write_gzip)
  else
    @write_proc = self.method(:write_uncompressed)
  end
end

Instance Method Details

#abort_on_errorObject



314
315
316
# File 'lib/embulk/output/vertica/output_thread.rb', line 314

def abort_on_error
  @task['abort_on_error'] ? ' ABORT ON ERROR' : ''
end

#close(jv) ⇒ Object



240
241
242
243
244
245
246
# File 'lib/embulk/output/vertica/output_thread.rb', line 240

def close(jv)
  begin
    jv.close
  rescue java.sql.SQLException => e # The connection is closed
    Embulk.logger.debug "embulk-output-vertica: #{e.class} #{e.message}"
  end
end

#commitObject



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/embulk/output/vertica/output_thread.rb', line 261

def commit
  Embulk.logger.debug "embulk-output-vertica: output_thread commit"
  @thread_active = false
  success = true
  if @thread.alive?
    Embulk.logger.debug { "embulk-output-vertica: push finish with finish_timeout:#{@task['finish_timeout']}" }
    @queue.push('finish')
    Thread.pass
    @thread.join(@task['finish_timeout'])
    if @thread.alive?
      @thread.kill
      Embulk.logger.error "embulk-output-vertica: finish_timeout #{@task['finish_timeout']}sec exceeded, thread is killed forcely"
      success = false
    end
  else
    Embulk.logger.error "embulk-output-vertica: thread died accidently"
    success = false
  end

  task_report = {
    'num_input_rows' => @num_input_rows,
    'num_output_rows' => @num_output_rows,
    'num_rejected_rows' => @num_rejected_rows,
    'success' => success
  }
end

#compressObject



306
307
308
# File 'lib/embulk/output/vertica/output_thread.rb', line 306

def compress
  " #{@task['compress']}"
end

#copy(jv, sql, &block) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/embulk/output/vertica/output_thread.rb', line 163

def copy(jv, sql, &block)
  Embulk.logger.debug "embulk-output-vertica: copy, waiting a first message"

  num_output_rows = 0; rejected_row_nums = []; last_record = nil

  json_page = dequeue
  return [num_output_rows, rejected_row_nums, last_record] if json_page == 'finish'

  Embulk.logger.debug "embulk-output-vertica: #{sql}"

  num_output_rows, rejected_row_nums = jv.copy(sql) do |stdin, stream|
    @write_proc.call(stdin, json_page) {|record| last_record = record }

    while true
      json_page = dequeue
      break if json_page == 'finish'
      @write_proc.call(stdin, json_page) {|record| last_record = record }
    end
  end

  @num_output_rows += num_output_rows
  @num_rejected_rows += rejected_row_nums.size
  Embulk.logger.info { "embulk-output-vertica: COMMIT!" }
  jv.commit
  Embulk.logger.debug { "embulk-output-vertica: COMMITTED!" }

  if rejected_row_nums.size > 0
    Embulk.logger.debug { "embulk-output-vertica: rejected_row_nums: #{rejected_row_nums}" }
  end

  [num_output_rows, rejected_row_nums, last_record]
end

#copy_modeObject



310
311
312
# File 'lib/embulk/output/vertica/output_thread.rb', line 310

def copy_mode
  " #{@task['copy_mode']}"
end

#copy_sqlObject

private



290
291
292
# File 'lib/embulk/output/vertica/output_thread.rb', line 290

def copy_sql
  @copy_sql ||= "COPY #{quoted_schema}.#{quoted_temp_table} FROM STDIN#{compress}#{fjsonparser}#{copy_mode}#{abort_on_error} NO COMMIT"
end

#dequeueArray, String

Returns:

  • (Array)

    dequeued json_page

  • (String)

    ‘finish’ is dequeued to finish



154
155
156
157
158
159
160
161
# File 'lib/embulk/output/vertica/output_thread.rb', line 154

def dequeue
  json_page = nil
  Embulk.logger.trace { "embulk-output-vertica: @queue.pop with dequeue_timeout:#{@task['dequeue_timeout']}" }
  Timeout.timeout(@task['dequeue_timeout'], DequeueTimeoutError) { json_page = @queue.pop }
  Embulk.logger.trace { "embulk-output-vertica: dequeued" }
  Embulk.logger.debug { "embulk-output-vertica: dequeued finish" } if json_page == 'finish'
  json_page
end

#dequeue_allObject



233
234
235
236
237
238
# File 'lib/embulk/output/vertica/output_thread.rb', line 233

def dequeue_all
  Embulk.logger.debug "embulk-output-vertica: dequeue all"
  while @queue.size > 0
    @queue.pop # dequeue all because some might be still trying @queue.push and get blocked, need to release
  end
end

#enqueue(json_page) ⇒ Object



96
97
98
99
100
101
102
103
104
105
# File 'lib/embulk/output/vertica/output_thread.rb', line 96

def enqueue(json_page)
  if @thread_active and @thread.alive?
    Embulk.logger.trace { "embulk-output-vertica: enqueue" }
    @queue.push(json_page)
  else
    Embulk.logger.info { "embulk-output-vertica: thread is dead, but still trying to enqueue" }
    thread_dump
    raise RuntimeError, "embulk-output-vertica: thread is died, but still trying to enqueue"
  end
end

#fjsonparserObject



318
319
320
# File 'lib/embulk/output/vertica/output_thread.rb', line 318

def fjsonparser
  " PARSER fjsonparser(#{reject_on_materialized_type_error})"
end

#num_format(number) ⇒ Object



148
149
150
# File 'lib/embulk/output/vertica/output_thread.rb', line 148

def num_format(number)
  number.to_s.gsub(/(\d)(?=(\d{3})+(?!\d))/, '\1,')
end

#quoted_schemaObject



294
295
296
# File 'lib/embulk/output/vertica/output_thread.rb', line 294

def quoted_schema
  ::Jvertica.quote_identifier(@task['schema'])
end

#quoted_tableObject



298
299
300
# File 'lib/embulk/output/vertica/output_thread.rb', line 298

def quoted_table
  ::Jvertica.quote_identifier(@task['table'])
end

#quoted_temp_tableObject



302
303
304
# File 'lib/embulk/output/vertica/output_thread.rb', line 302

def quoted_temp_table
  ::Jvertica.quote_identifier(@task['temp_table'])
end

#reject_on_materialized_type_errorObject



322
323
324
# File 'lib/embulk/output/vertica/output_thread.rb', line 322

def reject_on_materialized_type_error
  @task['reject_on_materialized_type_error'] ? 'reject_on_materialized_type_error=true' : ''
end

#rollback(jv) ⇒ Object



248
249
250
251
252
253
254
# File 'lib/embulk/output/vertica/output_thread.rb', line 248

def rollback(jv)
  begin
    jv.rollback
  rescue java.sql.SQLException => e # The connection is closed
    Embulk.logger.debug "embulk-output-vertica: #{e.class} #{e.message}"
  end
end

#runObject



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/embulk/output/vertica/output_thread.rb', line 196

def run
  Embulk.logger.debug { "embulk-output-vertica: thread started" }
  begin
    jv = Vertica.connect(@task)
    begin
      num_output_rows, rejected_row_nums, last_record = copy(jv, copy_sql)
      Embulk.logger.debug { "embulk-output-vertica: thread finished" }
    rescue java.sql.SQLDataException => e
      if @task['reject_on_materialized_type_error'] and e.message =~ /Rejected by user-defined parser/
        Embulk.logger.warn "embulk-output-vertica: ROLLBACK! some of column types and values types do not fit #{rejected_row_nums}"
      else
        Embulk.logger.warn "embulk-output-vertica: ROLLBACK! #{rejected_row_nums}"
      end
      Embulk.logger.info { "embulk-output-vertica: last_record: #{last_record}" }
      rollback(jv)
      raise e
    rescue => e
      Embulk.logger.warn "embulk-output-vertica: ROLLBACK! #{e.class} #{e.message} #{e.backtrace.join("\n  ")}"
      rollback(jv)
      raise e
    end
  ensure
    close(jv)
  end
rescue TimeoutError => e
  Embulk.logger.error "embulk-output-vertica: UNKNOWN TIMEOUT!! #{e.class}"
  @thread_active = false # not to be enqueued any more
  dequeue_all
  thread_dump
  exit(1)
rescue Exception => e
  Embulk.logger.error "embulk-output-vertica: UNKNOWN ERROR! #{e.class} #{e.message} #{e.backtrace.join("\n  ")}"
  @thread_active = false # not to be enqueued any more
  dequeue_all
  @outer_thread.raise e
end

#startObject



256
257
258
259
# File 'lib/embulk/output/vertica/output_thread.rb', line 256

def start
  @thread = Thread.new(&method(:run))
  @thread_active = true
end

#thread_dumpObject



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/embulk/output/vertica/output_thread.rb', line 83

def thread_dump
  unless $embulk_output_vertica_thread_dumped
    $embulk_output_vertica_thread_dumped = true
    Embulk.logger.debug "embulk-output-vertica: kill -3 #{$$} (Thread dump)"
    begin
      Process.kill :QUIT, $$
    rescue SignalException
    ensure
      sleep 1
    end
  end
end

#write_buf(buf, json_page, &block) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/embulk/output/vertica/output_thread.rb', line 132

def write_buf(buf, json_page, &block)
  json_page.each do |record|
    yield(record) if block_given?
    Embulk.logger.trace { "embulk-output-vertica: record #{record}" }
    buf << record << "\n"
    @num_input_rows += 1
  end
  now = Time.now
  if @progress_log_timer < now - 10 # once in 10 seconds
    speed = ((@num_input_rows - @previous_num_input_rows) / (now - @progress_log_timer).to_f).round(1)
    @progress_log_timer = now
    @previous_num_input_rows = @num_input_rows
    Embulk.logger.info { "embulk-output-vertica: num_input_rows #{num_format(@num_input_rows)} (#{num_format(speed)} rows/sec)" }
  end
end

#write_gzip(io, page, &block) ⇒ Object



107
108
109
110
111
# File 'lib/embulk/output/vertica/output_thread.rb', line 107

def write_gzip(io, page, &block)
  buf = Zlib::Deflate.new
  write_buf(buf, page, &block)
  write_io(io, buf.finish)
end

#write_io(io, str) ⇒ Object



121
122
123
124
125
126
127
128
129
130
# File 'lib/embulk/output/vertica/output_thread.rb', line 121

def write_io(io, str)
  str = str.force_encoding('ASCII-8BIT')
  i = 0
  # split str not to be blocked (max size of pipe buf is 64k bytes on Linux, Mac at default)
  while substr = str[i, PIPE_BUF]
    Embulk.logger.trace { "embulk-output-vertica: io.write with write_timeout:#{@task['write_timeout']}" }
    Timeout.timeout(@task['write_timeout'], WriteTimeoutError) { io.write(substr) }
    i += PIPE_BUF
  end
end

#write_uncompressed(io, page, &block) ⇒ Object



113
114
115
116
117
# File 'lib/embulk/output/vertica/output_thread.rb', line 113

def write_uncompressed(io, page, &block)
  buf = ''
  write_buf(buf, page, &block)
  write_io(io, buf)
end