Class: Bricolage::StreamingLoad::Job::ControlConnection

Inherits:
Object
  • Object
show all
Includes:
SQLUtils
Defined in:
lib/bricolage/streamingload/job.rb

Defined Under Namespace

Classes: JobInfo, TaskInfo

Constant Summary collapse

MAX_MESSAGE_LENGTH =
1000

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ds, logger = ds.logger) ⇒ ControlConnection

Returns a new instance of ControlConnection.



278
279
280
281
# File 'lib/bricolage/streamingload/job.rb', line 278

def initialize(ds, logger = ds.logger)
  @ds = ds
  @connection = nil
end

Class Method Details

.open(ds, logger = ds.logger, &block) ⇒ Object



274
275
276
# File 'lib/bricolage/streamingload/job.rb', line 274

def ControlConnection.open(ds, logger = ds.logger, &block)
  new(ds, logger).open(&block)
end

Instance Method Details

#abort_job(job_id, status, message) ⇒ Object



422
423
424
# File 'lib/bricolage/streamingload/job.rb', line 422

def abort_job(job_id, status, message)
  write_job_result(job_id, status, message)
end

#begin_job(task_id, process_id, force) ⇒ Object



391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# File 'lib/bricolage/streamingload/job.rb', line 391

def begin_job(task_id, process_id, force)
  job_id = @connection.query_value("    insert into strload_jobs\n        ( task_id\n        , process_id\n        , status\n        , start_time\n        )\n    select\n        task_id\n        , \#{s process_id}\n        , 'running'\n        , current_timestamp\n    from\n        strload_tasks\n    where\n        task_id = \#{task_id}\n        and (\#{force ? 'true' : 'false'} or task_id not in (select task_id from strload_jobs where status = 'success'))\n    returning job_id\n    ;\n  EndSQL\n  return job_id ? job_id.to_i : nil\nend\n")

#commit_duplicated_job(task_id, process_id) ⇒ Object



458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
# File 'lib/bricolage/streamingload/job.rb', line 458

def commit_duplicated_job(task_id, process_id)
  job_id = @connection.query_value("    insert into strload_jobs\n        ( task_id\n        , process_id\n        , status\n        , start_time\n        , finish_time\n        , message\n        )\n    select\n        \#{task_id}\n        , \#{s process_id}\n        , 'duplicated'\n        , current_timestamp\n        , current_timestamp\n        , ''\n    returning job_id\n    ;\n  EndSQL\n  return job_id\nend\n")

#commit_job(job_id, message = nil) ⇒ Object



415
416
417
418
419
420
# File 'lib/bricolage/streamingload/job.rb', line 415

def commit_job(job_id, message = nil)
  @connection.transaction {|txn|
    write_job_result job_id, 'success', (message || '')
    update_loaded_flag job_id
  }
end

#fix_job_status(job_id, status) ⇒ Object



377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/bricolage/streamingload/job.rb', line 377

def fix_job_status(job_id, status)
  @connection.update("    update\n        strload_jobs\n    set\n        status = \#{s status}\n        , message = 'status fixed: ' || message\n    where\n        job_id = \#{job_id}\n        and status = 'unknown'\n    ;\n  EndSQL\nend\n")

#load_jobs(task_id) ⇒ Object



344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/bricolage/streamingload/job.rb', line 344

def load_jobs(task_id)
  records = @connection.query_rows("    select\n        job_id\n        , status\n    from\n        strload_jobs\n    where\n        task_id = \#{task_id}\n    order by\n        start_time\n    ;\n  EndSQL\n  records.map {|rec| JobInfo.new(rec['job_id'].to_i, rec['status']) }\nend\n")

#load_object_urls(task_id) ⇒ Object



362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/bricolage/streamingload/job.rb', line 362

def load_object_urls(task_id)
  urls = @connection.query_values("    select\n        o.object_url\n    from\n        strload_tasks t\n        inner join strload_task_objects tob using (task_id)\n        inner join strload_objects o using (object_id)\n    where\n        t.task_id = \#{task_id}\n    ;\n  EndSQL\n  urls\nend\n")

#load_task(task_id) ⇒ Object



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/bricolage/streamingload/job.rb', line 319

def load_task(task_id)
  rec = @connection.query_row("    select\n        tsk.task_class\n        , tbl.schema_name\n        , tbl.table_name\n        , tbl.disabled\n    from\n        strload_tasks tsk\n        inner join strload_tables tbl using (table_id)\n    where\n        tsk.task_id = \#{task_id}\n    ;\n  EndSQL\n  TaskInfo.new(\n    task_id,\n    rec['task_class'],\n    rec['schema_name'],\n    rec['table_name'],\n    (rec['disabled'] != 'f'),\n    load_object_urls(task_id),\n    load_jobs(task_id)\n  )\nend\n") or raise JobError, "no such task: #{task_id}"

#open(&block) ⇒ Object



283
284
285
286
287
288
289
290
# File 'lib/bricolage/streamingload/job.rb', line 283

def open(&block)
  @ds.open {|conn|
    @connection = conn
    yield self
  }
rescue ConnectionError => ex
  raise ControlConnectionFailed, "control connection failed: #{ex.message}"
end

#update_loaded_flag(job_id) ⇒ Object



440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
# File 'lib/bricolage/streamingload/job.rb', line 440

def update_loaded_flag(job_id)
  @connection.execute("    update\n        strload_objects\n    set\n        loaded = true\n    where\n        object_id in (\n          select\n              object_id\n          from\n              strload_task_objects\n          where task_id = (select task_id from strload_jobs where job_id = \#{job_id})\n        )\n    ;\n  EndSQL\nend\n")

#write_job_result(job_id, status, message) ⇒ Object



428
429
430
431
432
433
434
435
436
437
438
# File 'lib/bricolage/streamingload/job.rb', line 428

def write_job_result(job_id, status, message)
  @connection.execute("    update\n        strload_jobs\n    set\n        (status, finish_time, message) = (\#{s status}, current_timestamp, \#{s message[0, MAX_MESSAGE_LENGTH]})\n    where\n        job_id = \#{job_id}\n    ;\n  EndSQL\nend\n")