Class: Bricolage::StreamingLoad::Job::ControlConnection
- Inherits:
-
Object
- Object
- Bricolage::StreamingLoad::Job::ControlConnection
show all
- Includes:
- SQLUtils
- Defined in:
- lib/bricolage/streamingload/job.rb
Defined Under Namespace
Classes: JobInfo, TaskInfo
Constant Summary
collapse
- MAX_MESSAGE_LENGTH =
1000
Class Method Summary
collapse
Instance Method Summary
collapse
-
#abort_job(job_id, status, message) ⇒ Object
-
#begin_job(task_id, process_id, force) ⇒ Object
-
#commit_duplicated_job(task_id, process_id) ⇒ Object
-
#commit_job(job_id, message = nil) ⇒ Object
-
#fix_job_status(job_id, status) ⇒ Object
-
#initialize(ds, logger = ds.logger) ⇒ ControlConnection
constructor
A new instance of ControlConnection.
-
#load_jobs(task_id) ⇒ Object
-
#load_object_urls(task_id) ⇒ Object
-
#load_task(task_id) ⇒ Object
-
#open(&block) ⇒ Object
-
#update_loaded_flag(job_id) ⇒ Object
-
#write_job_result(job_id, status, message) ⇒ Object
Constructor Details
#initialize(ds, logger = ds.logger) ⇒ ControlConnection
Returns a new instance of ControlConnection.
278
279
280
281
|
# File 'lib/bricolage/streamingload/job.rb', line 278
def initialize(ds, logger = ds.logger)
@ds = ds
@connection = nil
end
|
Class Method Details
.open(ds, logger = ds.logger, &block) ⇒ Object
274
275
276
|
# File 'lib/bricolage/streamingload/job.rb', line 274
def ControlConnection.open(ds, logger = ds.logger, &block)
new(ds, logger).open(&block)
end
|
Instance Method Details
#abort_job(job_id, status, message) ⇒ Object
422
423
424
|
# File 'lib/bricolage/streamingload/job.rb', line 422
def abort_job(job_id, status, message)
write_job_result(job_id, status, message)
end
|
#begin_job(task_id, process_id, force) ⇒ Object
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
|
# File 'lib/bricolage/streamingload/job.rb', line 391
def begin_job(task_id, process_id, force)
job_id = @connection.query_value(" insert into strload_jobs\n ( task_id\n , process_id\n , status\n , start_time\n )\n select\n task_id\n , \#{s process_id}\n , 'running'\n , current_timestamp\n from\n strload_tasks\n where\n task_id = \#{task_id}\n and (\#{force ? 'true' : 'false'} or task_id not in (select task_id from strload_jobs where status = 'success'))\n returning job_id\n ;\n EndSQL\n return job_id ? job_id.to_i : nil\nend\n")
|
#commit_duplicated_job(task_id, process_id) ⇒ Object
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
|
# File 'lib/bricolage/streamingload/job.rb', line 458
def commit_duplicated_job(task_id, process_id)
job_id = @connection.query_value(" insert into strload_jobs\n ( task_id\n , process_id\n , status\n , start_time\n , finish_time\n , message\n )\n select\n \#{task_id}\n , \#{s process_id}\n , 'duplicated'\n , current_timestamp\n , current_timestamp\n , ''\n returning job_id\n ;\n EndSQL\n return job_id\nend\n")
|
#commit_job(job_id, message = nil) ⇒ Object
415
416
417
418
419
420
|
# File 'lib/bricolage/streamingload/job.rb', line 415
def commit_job(job_id, message = nil)
@connection.transaction {|txn|
write_job_result job_id, 'success', (message || '')
update_loaded_flag job_id
}
end
|
#fix_job_status(job_id, status) ⇒ Object
377
378
379
380
381
382
383
384
385
386
387
388
389
|
# File 'lib/bricolage/streamingload/job.rb', line 377
def fix_job_status(job_id, status)
@connection.update(" update\n strload_jobs\n set\n status = \#{s status}\n , message = 'status fixed: ' || message\n where\n job_id = \#{job_id}\n and status = 'unknown'\n ;\n EndSQL\nend\n")
|
#load_jobs(task_id) ⇒ Object
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
|
# File 'lib/bricolage/streamingload/job.rb', line 344
def load_jobs(task_id)
records = @connection.query_rows(" select\n job_id\n , status\n from\n strload_jobs\n where\n task_id = \#{task_id}\n order by\n start_time\n ;\n EndSQL\n records.map {|rec| JobInfo.new(rec['job_id'].to_i, rec['status']) }\nend\n")
|
#load_object_urls(task_id) ⇒ Object
362
363
364
365
366
367
368
369
370
371
372
373
374
375
|
# File 'lib/bricolage/streamingload/job.rb', line 362
def load_object_urls(task_id)
urls = @connection.query_values(" select\n o.object_url\n from\n strload_tasks t\n inner join strload_task_objects tob using (task_id)\n inner join strload_objects o using (object_id)\n where\n t.task_id = \#{task_id}\n ;\n EndSQL\n urls\nend\n")
|
#load_task(task_id) ⇒ Object
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
|
# File 'lib/bricolage/streamingload/job.rb', line 319
def load_task(task_id)
rec = @connection.query_row(" select\n tsk.task_class\n , tbl.schema_name\n , tbl.table_name\n , tbl.disabled\n from\n strload_tasks tsk\n inner join strload_tables tbl using (table_id)\n where\n tsk.task_id = \#{task_id}\n ;\n EndSQL\n TaskInfo.new(\n task_id,\n rec['task_class'],\n rec['schema_name'],\n rec['table_name'],\n (rec['disabled'] != 'f'),\n load_object_urls(task_id),\n load_jobs(task_id)\n )\nend\n") or raise JobError, "no such task: #{task_id}"
|
#open(&block) ⇒ Object
283
284
285
286
287
288
289
290
|
# File 'lib/bricolage/streamingload/job.rb', line 283
def open(&block)
@ds.open {|conn|
@connection = conn
yield self
}
rescue ConnectionError => ex
raise ControlConnectionFailed, "control connection failed: #{ex.message}"
end
|
#update_loaded_flag(job_id) ⇒ Object
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
|
# File 'lib/bricolage/streamingload/job.rb', line 440
def update_loaded_flag(job_id)
@connection.execute(" update\n strload_objects\n set\n loaded = true\n where\n object_id in (\n select\n object_id\n from\n strload_task_objects\n where task_id = (select task_id from strload_jobs where job_id = \#{job_id})\n )\n ;\n EndSQL\nend\n")
|
#write_job_result(job_id, status, message) ⇒ Object
428
429
430
431
432
433
434
435
436
437
438
|
# File 'lib/bricolage/streamingload/job.rb', line 428
def write_job_result(job_id, status, message)
@connection.execute(" update\n strload_jobs\n set\n (status, finish_time, message) = (\#{s status}, current_timestamp, \#{s message[0, MAX_MESSAGE_LENGTH]})\n where\n job_id = \#{job_id}\n ;\n EndSQL\nend\n")
|