Class: Bricolage::StreamingLoad::Job::ControlConnection
- Inherits:
-
Object
- Object
- Bricolage::StreamingLoad::Job::ControlConnection
- Includes:
- SQLUtils
- Defined in:
- lib/bricolage/streamingload/job.rb
Defined Under Namespace
Constant Summary collapse
- MAX_MESSAGE_LENGTH =
1000
Class Method Summary collapse
Instance Method Summary collapse
- #abort_job(job_id, status, message) ⇒ Object
- #begin_job(task_id, process_id, force) ⇒ Object
- #commit_duplicated_job(task_id, process_id) ⇒ Object
- #commit_job(job_id, message = nil) ⇒ Object
- #fix_job_status(job_id, status) ⇒ Object
-
#initialize(ds, logger = ds.logger) ⇒ ControlConnection
constructor
A new instance of ControlConnection.
- #load_jobs(task_id) ⇒ Object
- #load_object_urls(task_id) ⇒ Object
- #load_task(task_id) ⇒ Object
- #open(&block) ⇒ Object
- #update_loaded_flag(job_id) ⇒ Object
- #write_job_result(job_id, status, message) ⇒ Object
Constructor Details
#initialize(ds, logger = ds.logger) ⇒ ControlConnection
Returns a new instance of ControlConnection.
267 268 269 270 |
# File 'lib/bricolage/streamingload/job.rb', line 267 def initialize(ds, logger = ds.logger) @ds = ds @connection = nil end |
Class Method Details
.open(ds, logger = ds.logger, &block) ⇒ Object
263 264 265 |
# File 'lib/bricolage/streamingload/job.rb', line 263 def ControlConnection.open(ds, logger = ds.logger, &block) new(ds, logger).open(&block) end |
Instance Method Details
#abort_job(job_id, status, message) ⇒ Object
411 412 413 |
# File 'lib/bricolage/streamingload/job.rb', line 411 def abort_job(job_id, status, ) write_job_result(job_id, status, ) end |
#begin_job(task_id, process_id, force) ⇒ Object
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
# File 'lib/bricolage/streamingload/job.rb', line 380 def begin_job(task_id, process_id, force) job_id = @connection.query_value(<<-EndSQL) insert into strload_jobs ( task_id , process_id , status , start_time ) select task_id , #{s process_id} , 'running' , current_timestamp from strload_tasks where task_id = #{task_id} and (#{force ? 'true' : 'false'} or task_id not in (select task_id from strload_jobs where status = 'success')) returning job_id ; EndSQL return job_id ? job_id.to_i : nil end |
#commit_duplicated_job(task_id, process_id) ⇒ Object
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 |
# File 'lib/bricolage/streamingload/job.rb', line 447 def commit_duplicated_job(task_id, process_id) job_id = @connection.query_value(<<-EndSQL) insert into strload_jobs ( task_id , process_id , status , start_time , finish_time , message ) select #{task_id} , #{s process_id} , 'duplicated' , current_timestamp , current_timestamp , '' returning job_id ; EndSQL return job_id end |
#commit_job(job_id, message = nil) ⇒ Object
404 405 406 407 408 409 |
# File 'lib/bricolage/streamingload/job.rb', line 404 def commit_job(job_id, = nil) @connection.transaction {|txn| write_job_result job_id, 'success', ( || '') update_loaded_flag job_id } end |
#fix_job_status(job_id, status) ⇒ Object
366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/bricolage/streamingload/job.rb', line 366 def fix_job_status(job_id, status) @connection.update(<<-EndSQL) update strload_jobs set status = #{s status} , message = 'status fixed: ' || message where job_id = #{job_id} and status = 'unknown' ; EndSQL end |
#load_jobs(task_id) ⇒ Object
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 |
# File 'lib/bricolage/streamingload/job.rb', line 333 def load_jobs(task_id) records = @connection.query_rows(<<-EndSQL) select job_id , status from strload_jobs where task_id = #{task_id} order by start_time ; EndSQL records.map {|rec| JobInfo.new(rec['job_id'].to_i, rec['status']) } end |
#load_object_urls(task_id) ⇒ Object
351 352 353 354 355 356 357 358 359 360 361 362 363 364 |
# File 'lib/bricolage/streamingload/job.rb', line 351 def load_object_urls(task_id) urls = @connection.query_values(<<-EndSQL) select o.object_url from strload_tasks t inner join strload_task_objects tob using (task_id) inner join strload_objects o using (object_id) where t.task_id = #{task_id} ; EndSQL urls end |
#load_task(task_id) ⇒ Object
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/bricolage/streamingload/job.rb', line 308 def load_task(task_id) rec = @connection.query_row(<<-EndSQL) or raise JobError, "no such task: #{task_id}" select tsk.task_class , tbl.schema_name , tbl.table_name , tbl.disabled from strload_tasks tsk inner join strload_tables tbl using (table_id) where tsk.task_id = #{task_id} ; EndSQL TaskInfo.new( task_id, rec['task_class'], rec['schema_name'], rec['table_name'], (rec['disabled'] != 'f'), load_object_urls(task_id), load_jobs(task_id) ) end |
#open(&block) ⇒ Object
272 273 274 275 276 277 278 279 |
# File 'lib/bricolage/streamingload/job.rb', line 272 def open(&block) @ds.open {|conn| @connection = conn yield self } rescue ConnectionError => ex raise ControlConnectionFailed, "control connection failed: #{ex.}" end |
#update_loaded_flag(job_id) ⇒ Object
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 |
# File 'lib/bricolage/streamingload/job.rb', line 429 def update_loaded_flag(job_id) @connection.execute(<<-EndSQL) update strload_objects set loaded = true where object_id in ( select object_id from strload_task_objects where task_id = (select task_id from strload_jobs where job_id = #{job_id}) ) ; EndSQL end |
#write_job_result(job_id, status, message) ⇒ Object
417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/bricolage/streamingload/job.rb', line 417 def write_job_result(job_id, status, ) @connection.execute(<<-EndSQL) update strload_jobs set (status, finish_time, message) = (#{s status}, current_timestamp, #{s [0, MAX_MESSAGE_LENGTH]}) where job_id = #{job_id} ; EndSQL end |