Class: Bricolage::StreamingLoad::Job::DataConnection
- Inherits:
-
Object
- Object
- Bricolage::StreamingLoad::Job::DataConnection
- Includes:
- SQLUtils
- Defined in:
- lib/bricolage/streamingload/job.rb
Class Method Summary collapse
Instance Method Summary collapse
- #execute_copy(dest_table, manifest, options) ⇒ Object
- #get_job_status(job_id) ⇒ Object
-
#initialize(ds, log_table:, logger: ds.logger) ⇒ DataConnection
constructor
A new instance of DataConnection.
- #load_objects(dest_table, manifest, options, log) ⇒ Object
- #load_with_work_table(work_table, manifest, options, sql_source, log) ⇒ Object
- #open(&block) ⇒ Object
- #write_load_log(log) ⇒ Object
Constructor Details
#initialize(ds, log_table:, logger: ds.logger) ⇒ DataConnection
Returns a new instance of DataConnection.
203 204 205 206 207 208 |
# File 'lib/bricolage/streamingload/job.rb', line 203 def initialize(ds, log_table:, logger: ds.logger) @ds = ds @log_table = log_table @logger = logger @connection = nil end |
Class Method Details
.open(ds, log_table:, logger: ds.logger, &block) ⇒ Object
199 200 201 |
# File 'lib/bricolage/streamingload/job.rb', line 199 def DataConnection.open(ds, log_table:, logger: ds.logger, &block) new(ds, log_table: log_table, logger: logger).open(&block) end |
Instance Method Details
#execute_copy(dest_table, manifest, options) ⇒ Object
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/bricolage/streamingload/job.rb', line 243 def execute_copy(dest_table, manifest, ) @connection.execute(<<-EndSQL.strip.gsub(/\s+/, ' ')) copy #{dest_table} from #{s manifest.url} credentials #{s manifest.credential_string} manifest statupdate false compupdate false #{} ; EndSQL @logger.info "load succeeded: #{manifest.url}" rescue JobFailure => ex if /stl_load_errors/ =~ ex. # We cannot resolve this load error by retry, give up now. raise JobError, ex. end raise end |
#get_job_status(job_id) ⇒ Object
219 220 221 222 |
# File 'lib/bricolage/streamingload/job.rb', line 219 def get_job_status(job_id) count = @connection.query_value("select count(*) from #{@log_table} where job_id = #{job_id}") count.to_i > 0 ? 'success' : 'failure' end |
#load_objects(dest_table, manifest, options, log) ⇒ Object
236 237 238 239 240 241 |
# File 'lib/bricolage/streamingload/job.rb', line 236 def load_objects(dest_table, manifest, , log) @connection.transaction {|txn| execute_copy dest_table, manifest, write_load_log log } end |
#load_with_work_table(work_table, manifest, options, sql_source, log) ⇒ Object
224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/bricolage/streamingload/job.rb', line 224 def load_with_work_table(work_table, manifest, , sql_source, log) @connection.transaction {|txn| # NOTE: This transaction ends with truncation, this DELETE does nothing # from the second time. So don't worry about DELETE cost here. @connection.execute("delete from #{work_table}") execute_copy work_table, manifest, @connection.execute sql_source write_load_log log txn.truncate_and_commit work_table } end |
#open(&block) ⇒ Object
210 211 212 213 214 215 216 217 |
# File 'lib/bricolage/streamingload/job.rb', line 210 def open(&block) @ds.open {|conn| @connection = conn yield self } rescue ConnectionError => ex raise DataConnectionFailed, "data connection failed: #{ex.}" end |
#write_load_log(log) ⇒ Object
263 264 265 |
# File 'lib/bricolage/streamingload/job.rb', line 263 def write_load_log(log) @connection.execute("insert into #{@log_table} (task_id, job_id, finish_time) values (#{log.task_id}, #{log.job_id}, current_timestamp)") end |