Class: BatchKit::Database
- Inherits:
- 
      Object
      
        - Object
- BatchKit::Database
 
- Defined in:
- lib/batch-kit/database.rb,
 lib/batch-kit/database/models.rb,
 lib/batch-kit/database/schema.rb,
 lib/batch-kit/database/log4r_outputter.rb,
 lib/batch-kit/database/java_util_log_handler.rb
Overview
Implements functionality for persisting details of jobs run in a relational database, via the Sequel database library.
Defined Under Namespace
Classes: JavaUtilLogHandler, Job, JobRun, JobRunArg, JobRunFailure, JobRunLog, Lock, Log4ROutputter, MD5, Request, Requestor, Schema, Task, TaskRun
Instance Method Summary collapse
- 
  
    
      #connect(*args)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Connect to a back-end database for persistence. 
- 
  
    
      #initialize(options = {})  ⇒ Database 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    Instantiate a database back-end for persisting job and task runs. 
- 
  
    
      #log  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Log database messages under the batch.database namespace. 
- 
  
    
      #perform_housekeeping  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Purges detail records that are older than the retention threshhold. 
Constructor Details
Instance Method Details
#connect(*args) ⇒ Object
Connect to a back-end database for persistence.
| 32 33 34 35 36 37 38 39 40 41 42 43 | # File 'lib/batch-kit/database.rb', line 32 def connect(*args) @schema.connect(*args) # We can only include the models once we have connected require_relative 'database/models' # Check if the database schema is up-to-date MD5.check_schema(@schema) # Perform housekeeping tasks perform_housekeeping end | 
#log ⇒ Object
Log database messages under the batch.database namespace.
| 23 24 25 | # File 'lib/batch-kit/database.rb', line 23 def log @log ||= BatchKit::LogManager.logger('batch.database') end | 
#perform_housekeeping ⇒ Object
Purges detail records that are older than the retention threshhold.
| 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | # File 'lib/batch-kit/database.rb', line 47 def perform_housekeeping # Only do housekeeping once per day return if JobRun.where{job_start_time > Date.today}.count > 0 log.info "Performing batch database housekeeping" # Abort jobs in Executing state that have not logged for 6+ hours @schema.connection.transaction do cutoff = Time.now - 6 * 60 * 60 exec_jobs = JobRun.where(job_status: 'EXECUTING').map(:job_run) curr_jobs = JobRunLog.select_group(:job_run). where(job_run: exec_jobs).having{max(log_time) > cutoff}.map(:job_run) abort_jobs = JobRun.where(job_run: exec_jobs - curr_jobs).all if abort_jobs.count > 0 log.detail "Cleaning up #{abort_jobs.count} zombie jobs" abort_tasks = TaskRun.where(job_run: abort_jobs.map(&:id), task_status: 'EXECUTING') abort_tasks.each(&:timeout) abort_jobs.each(&:timeout) end end # Purge locks that expired 6+ hours ago @schema.connection.transaction do purge_date = Time.now - 6 * 60 * 60 Lock.where{lock_expires_at < purge_date}.delete end # Purge log records for old job runs @schema.connection.transaction do purge_date = Date.today - @options.fetch(:log_retention_days, 60) purge_job_runs = JobRun.where(job_purged_flag: false). where{job_start_time < purge_date}.map(:job_run) if purge_job_runs.count > 0 log.detail "Purging log records for #{purge_job_runs.count} job runs" purge_job_runs.each_slice(1000).each do |purge_ids| JobRunLog.where(job_run: purge_ids).delete JobRun.where(job_run: purge_ids).update(job_purged_flag: true) end end end # Purge old task and job runs @schema.connection.transaction do purge_date = Date.today - @options.fetch(:job_run_retention_days, 365) purge_job_runs = JobRun.where{job_start_time < purge_date}.map(:job_run) if purge_job_runs.count > 0 log.detail "Purging job and task run records for #{purge_job_runs.count} job runs" purge_job_runs.each_slice(1000).each do |purge_ids| JobRunArg.where(job_run: purge_ids).delete TaskRun.where(job_run: purge_ids).delete JobRun.where(job_run: purge_ids).delete end end end # Purge old request runs @schema.connection.transaction do purge_date = Date.today - @options.fetch(:request_retention_days, 90) purge_requests = Request.where{request_launched_at < purge_date}.map(:request_id) if purge_requests.count > 0 log.detail "Purging request records for #{purge_requests.count} requests" purge_requests.each_slice(1000).each do |purge_ids| Request.where(request_id: purge_ids).delete Requestor.where(request_id: purge_ids).delete end end end # Purge jobs with no runs @schema.connection.transaction do purge_jobs = Job.left_join(:batch_job_run, :job_id => :job_id). where(Sequel.qualify(:batch_job_run, :job_id) => nil). select(Sequel.qualify(:batch_job, :job_id)).map(:job_id) if purge_jobs.count > 0 log.detail "Purging #{purge_jobs.count} old jobs" purge_jobs.each_slice(1000).each do |purge_ids| JobRunFailure.where(job_id: purge_ids).delete Task.where(job_id: purge_ids).delete Job.where(job_id: purge_ids).delete end end end end |