Class: Bricolage::StreamingLoad::Job::ControlConnection

Inherits:
Object
  • Object
show all
Includes:
SQLUtils
Defined in:
lib/bricolage/streamingload/job.rb

Defined Under Namespace

Classes: JobInfo, TaskInfo

Constant Summary collapse

MAX_MESSAGE_LENGTH =
1000

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ds, logger = ds.logger) ⇒ ControlConnection

Returns a new instance of ControlConnection.



278
279
280
281
# File 'lib/bricolage/streamingload/job.rb', line 278

def initialize(ds, logger = ds.logger)
  @ds = ds
  @connection = nil
end

Class Method Details

.open(ds, logger = ds.logger, &block) ⇒ Object



274
275
276
# File 'lib/bricolage/streamingload/job.rb', line 274

def ControlConnection.open(ds, logger = ds.logger, &block)
  new(ds, logger).open(&block)
end

Instance Method Details

#abort_job(job_id, status, message) ⇒ Object



422
423
424
# File 'lib/bricolage/streamingload/job.rb', line 422

def abort_job(job_id, status, message)
  write_job_result(job_id, status, message)
end

#begin_job(task_id, process_id, force) ⇒ Object



391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# File 'lib/bricolage/streamingload/job.rb', line 391

def begin_job(task_id, process_id, force)
  job_id = @connection.query_value(<<-EndSQL)
    insert into strload_jobs
        ( task_id
        , process_id
        , status
        , start_time
        )
    select
        task_id
        , #{s process_id}
        , 'running'
        , current_timestamp
    from
        strload_tasks
    where
        task_id = #{task_id}
        and (#{force ? 'true' : 'false'} or task_id not in (select task_id from strload_jobs where status = 'success'))
    returning job_id
    ;
  EndSQL
  return job_id ? job_id.to_i : nil
end

#commit_duplicated_job(task_id, process_id) ⇒ Object



458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
# File 'lib/bricolage/streamingload/job.rb', line 458

def commit_duplicated_job(task_id, process_id)
  job_id = @connection.query_value(<<-EndSQL)
    insert into strload_jobs
        ( task_id
        , process_id
        , status
        , start_time
        , finish_time
        , message
        )
    select
        #{task_id}
        , #{s process_id}
        , 'duplicated'
        , current_timestamp
        , current_timestamp
        , ''
    returning job_id
    ;
  EndSQL
  return job_id
end

#commit_job(job_id, message = nil) ⇒ Object



415
416
417
418
419
420
# File 'lib/bricolage/streamingload/job.rb', line 415

def commit_job(job_id, message = nil)
  @connection.transaction {|txn|
    write_job_result job_id, 'success', (message || '')
    update_loaded_flag job_id
  }
end

#fix_job_status(job_id, status) ⇒ Object



377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/bricolage/streamingload/job.rb', line 377

def fix_job_status(job_id, status)
  @connection.update(<<-EndSQL)
    update
        strload_jobs
    set
        status = #{s status}
        , message = 'status fixed: ' || message
    where
        job_id = #{job_id}
        and status = 'unknown'
    ;
  EndSQL
end

#load_jobs(task_id) ⇒ Object



344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/bricolage/streamingload/job.rb', line 344

def load_jobs(task_id)
  records = @connection.query_rows(<<-EndSQL)
    select
        job_id
        , status
    from
        strload_jobs
    where
        task_id = #{task_id}
    order by
        start_time
    ;
  EndSQL
  records.map {|rec| JobInfo.new(rec['job_id'].to_i, rec['status']) }
end

#load_object_urls(task_id) ⇒ Object



362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/bricolage/streamingload/job.rb', line 362

def load_object_urls(task_id)
  urls = @connection.query_values(<<-EndSQL)
    select
        o.object_url
    from
        strload_tasks t
        inner join strload_task_objects tob using (task_id)
        inner join strload_objects o using (object_id)
    where
        t.task_id = #{task_id}
    ;
  EndSQL
  urls
end

#load_task(task_id) ⇒ Object



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/bricolage/streamingload/job.rb', line 319

def load_task(task_id)
  rec = @connection.query_row(<<-EndSQL) or raise JobError, "no such task: #{task_id}"
    select
        tsk.task_class
        , tbl.schema_name
        , tbl.table_name
        , tbl.disabled
    from
        strload_tasks tsk
        inner join strload_tables tbl using (table_id)
    where
        tsk.task_id = #{task_id}
    ;
  EndSQL
  TaskInfo.new(
    task_id,
    rec['task_class'],
    rec['schema_name'],
    rec['table_name'],
    (rec['disabled'] != 'f'),
    load_object_urls(task_id),
    load_jobs(task_id)
  )
end

#open(&block) ⇒ Object



283
284
285
286
287
288
289
290
# File 'lib/bricolage/streamingload/job.rb', line 283

def open(&block)
  @ds.open {|conn|
    @connection = conn
    yield self
  }
rescue ConnectionError => ex
  raise ControlConnectionFailed, "control connection failed: #{ex.message}"
end

#update_loaded_flag(job_id) ⇒ Object



440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
# File 'lib/bricolage/streamingload/job.rb', line 440

def update_loaded_flag(job_id)
  @connection.execute(<<-EndSQL)
    update
        strload_objects
    set
        loaded = true
    where
        object_id in (
          select
              object_id
          from
              strload_task_objects
          where task_id = (select task_id from strload_jobs where job_id = #{job_id})
        )
    ;
  EndSQL
end

#write_job_result(job_id, status, message) ⇒ Object



428
429
430
431
432
433
434
435
436
437
438
# File 'lib/bricolage/streamingload/job.rb', line 428

def write_job_result(job_id, status, message)
  @connection.execute(<<-EndSQL)
    update
        strload_jobs
    set
        (status, finish_time, message) = (#{s status}, current_timestamp, #{s message[0, MAX_MESSAGE_LENGTH]})
    where
        job_id = #{job_id}
    ;
  EndSQL
end