Class: Bricolage::StreamingLoad::Job::DataConnection

Inherits:
Object
  • Object
show all
Includes:
SQLUtils
Defined in:
lib/bricolage/streamingload/job.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ds, log_table:, logger: ds.logger) ⇒ DataConnection

Returns a new instance of DataConnection.



203
204
205
206
207
208
# File 'lib/bricolage/streamingload/job.rb', line 203

def initialize(ds, log_table:, logger: ds.logger)
  @ds = ds
  @log_table = log_table
  @logger = logger
  @connection = nil
end

Class Method Details

.open(ds, log_table:, logger: ds.logger, &block) ⇒ Object



199
200
201
# File 'lib/bricolage/streamingload/job.rb', line 199

def DataConnection.open(ds, log_table:, logger: ds.logger, &block)
  new(ds, log_table: log_table, logger: logger).open(&block)
end

Instance Method Details

#execute_copy(dest_table, manifest, options) ⇒ Object



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/bricolage/streamingload/job.rb', line 243

def execute_copy(dest_table, manifest, options)
  @connection.execute(<<-EndSQL.strip.gsub(/\s+/, ' '))
      copy #{dest_table}
      from #{s manifest.url}
      credentials #{s manifest.credential_string}
      manifest
      statupdate false
      compupdate false
      #{options}
      ;
  EndSQL
  @logger.info "load succeeded: #{manifest.url}"
rescue JobFailure => ex
  if /stl_load_errors/ =~ ex.message
    # We cannot resolve this load error by retry, give up now.
    raise JobError, ex.message
  end
  raise
end

#get_job_status(job_id) ⇒ Object



219
220
221
222
# File 'lib/bricolage/streamingload/job.rb', line 219

def get_job_status(job_id)
  count = @connection.query_value("select count(*) from #{@log_table} where job_id = #{job_id}")
  count.to_i > 0 ? 'success' : 'failure'
end

#load_objects(dest_table, manifest, options, log) ⇒ Object



236
237
238
239
240
241
# File 'lib/bricolage/streamingload/job.rb', line 236

def load_objects(dest_table, manifest, options, log)
  @connection.transaction {|txn|
    execute_copy dest_table, manifest, options
    write_load_log log
  }
end

#load_with_work_table(work_table, manifest, options, sql_source, log) ⇒ Object



224
225
226
227
228
229
230
231
232
233
234
# File 'lib/bricolage/streamingload/job.rb', line 224

def load_with_work_table(work_table, manifest, options, sql_source, log)
  @connection.transaction {|txn|
    # NOTE: This transaction ends with truncation, this DELETE does nothing
    # from the second time.  So don't worry about DELETE cost here.
    @connection.execute("delete from #{work_table}")
    execute_copy work_table, manifest, options
    @connection.execute sql_source
    write_load_log log
    txn.truncate_and_commit work_table
  }
end

#open(&block) ⇒ Object



210
211
212
213
214
215
216
217
# File 'lib/bricolage/streamingload/job.rb', line 210

def open(&block)
  @ds.open {|conn|
    @connection = conn
    yield self
  }
rescue ConnectionError => ex
  raise DataConnectionFailed, "data connection failed: #{ex.message}"
end

#write_load_log(log) ⇒ Object



263
264
265
# File 'lib/bricolage/streamingload/job.rb', line 263

def write_load_log(log)
  @connection.execute("insert into #{@log_table} (task_id, job_id, finish_time) values (#{log.task_id}, #{log.job_id}, current_timestamp)")
end