Class: Bricolage::JobNetRunner
- Inherits:
-
Object
- Object
- Bricolage::JobNetRunner
- Defined in:
- lib/bricolage/jobnetrunner.rb
Defined Under Namespace
Classes: Options
Constant Summary collapse
- EXIT_SUCCESS =
JobResult::EXIT_SUCCESS
- EXIT_FAILURE =
JobResult::EXIT_FAILURE
- EXIT_ERROR =
JobResult::EXIT_ERROR
Class Method Summary collapse
Instance Method Summary collapse
- #check_jobs(queue) ⇒ Object
- #enqueue_jobs(jobnet, queue) ⇒ Object
- #error_exit(msg) ⇒ Object
- #execute_job(ref, queue) ⇒ Object
- #get_queue(opts) ⇒ Object
-
#initialize ⇒ JobNetRunner
constructor
A new instance of JobNetRunner.
- #list_jobs(queue) ⇒ Object
- #logger ⇒ Object
- #main ⇒ Object
- #make_log_path(job_ref) ⇒ Object
- #print_error(msg) ⇒ Object
- #program_name ⇒ Object
- #run_queue(queue) ⇒ Object
- #usage_exit(msg, usage) ⇒ Object
Constructor Details
#initialize ⇒ JobNetRunner
23 24 25 26 27 28 29 |
# File 'lib/bricolage/jobnetrunner.rb', line 23 def initialize Signal.trap('PIPE', 'IGNORE') @hooks = ::Bricolage @jobnet_id = nil @jobnet_start_time = Time.now @log_path = nil end |
Class Method Details
.main ⇒ Object
19 20 21 |
# File 'lib/bricolage/jobnetrunner.rb', line 19 def JobNetRunner.main new.main end |
Instance Method Details
#check_jobs(queue) ⇒ Object
97 98 99 100 101 |
# File 'lib/bricolage/jobnetrunner.rb', line 97 def check_jobs(queue) queue.each do |task| Job.load_ref(task.job, @ctx).compile end end |
#enqueue_jobs(jobnet, queue) ⇒ Object
82 83 84 85 86 87 88 89 |
# File 'lib/bricolage/jobnetrunner.rb', line 82 def enqueue_jobs(jobnet, queue) seq = 1 jobnet.sequential_jobs.each do |ref| queue.enq JobTask.new(ref) seq += 1 end queue.save end |
#error_exit(msg) ⇒ Object
160 161 162 163 |
# File 'lib/bricolage/jobnetrunner.rb', line 160 def error_exit(msg) print_error msg exit 1 end |
#execute_job(ref, queue) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/bricolage/jobnetrunner.rb', line 119 def execute_job(ref, queue) logger.debug "job #{ref}" job = Job.load_ref(ref, @ctx) job.compile @hooks.run_before_job_hooks(BeforeJobEvent.new(ref)) result = job.execute_in_process(make_log_path(ref)) @hooks.run_after_job_hooks(AfterJobEvent.new(result)) result rescue Exception => ex logger.exception ex logger.error "unexpected error: #{ref} (#{ex.class}: #{ex.message})" JobResult.error(ex) end |
#get_queue(opts) ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/bricolage/jobnetrunner.rb', line 74 def get_queue(opts) if opts.queue_path FileTaskQueue.restore_if_exist(opts.queue_path) else TaskQueue.new end end |
#list_jobs(queue) ⇒ Object
91 92 93 94 95 |
# File 'lib/bricolage/jobnetrunner.rb', line 91 def list_jobs(queue) queue.each do |task| puts task.job end end |
#logger ⇒ Object
70 71 72 |
# File 'lib/bricolage/jobnetrunner.rb', line 70 def logger @ctx.logger end |
#main ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/bricolage/jobnetrunner.rb', line 35 def main opts = Options.new(self) @hooks.run_before_option_parsing_hooks(opts) opts.parse ARGV @ctx = Context.for_application(nil, opts.jobnet_file, environment: opts.environment, global_variables: opts.global_variables) @jobnet_id = "#{opts.jobnet_file.dirname.basename}/#{opts.jobnet_file.basename('.jobnet')}" @log_path = opts.log_path jobnet = RootJobNet.load(@ctx, opts.jobnet_file) queue = get_queue(opts) if queue.locked? raise ParameterError, "Job queue is still locked. If you are sure to restart jobnet, #{queue.unlock_help}" end unless queue.queued? enqueue_jobs jobnet, queue logger.info "jobs are queued." if opts.queue_exist? end if opts.list_jobs? list_jobs queue exit EXIT_SUCCESS end check_jobs queue if opts.check_only? puts "OK" exit EXIT_SUCCESS end run_queue queue exit EXIT_SUCCESS rescue OptionError => ex raise if $DEBUG usage_exit ex., opts.help rescue ApplicationError => ex raise if $DEBUG error_exit ex. end |
#make_log_path(job_ref) ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/bricolage/jobnetrunner.rb', line 133 def make_log_path(job_ref) return nil unless @log_path start_time = Time.now @log_path.gsub(/%\{\w+\}/) {|var| case var when '%{jobnet_start_date}' then @jobnet_start_time.strftime('%Y%m%d') when '%{jobnet_start_time}' then @jobnet_start_time.strftime('%Y%m%d_%H%M%S%L') when '%{job_start_date}' then start_time.strftime('%Y%m%d') when '%{job_start_time}' then start_time.strftime('%Y%m%d_%H%M%S%L') when '%{jobnet}', '%{net}', '%{jobnet_id}', '%{net_id}', '%{flow}', '%{flow_id}' then @jobnet_id.gsub('/', '::') when '%{subsystem}' then job_ref.subsystem when '%{job}', '%{job_id}' then job_ref.name else raise ParameterError, "bad log path variable: #{var}" end } end |
#print_error(msg) ⇒ Object
165 166 167 |
# File 'lib/bricolage/jobnetrunner.rb', line 165 def print_error(msg) $stderr.puts "#{program_name}: error: #{msg}" end |
#program_name ⇒ Object
169 170 171 |
# File 'lib/bricolage/jobnetrunner.rb', line 169 def program_name File.basename($PROGRAM_NAME, '.*') end |
#run_queue(queue) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/bricolage/jobnetrunner.rb', line 103 def run_queue(queue) @hooks.run_before_all_jobs_hooks(BeforeAllJobsEvent.new(@jobnet_id, queue)) queue.consume_each do |task| result = execute_job(task.job, queue) unless result.success? logger.elapsed_time 'jobnet total: ', (Time.now - @jobnet_start_time) logger.error "[job #{task.job}] #{result.message}" @hooks.run_after_all_jobs_hooks(AfterAllJobsEvent.new(false, queue)) exit result.status end end @hooks.run_after_all_jobs_hooks(AfterAllJobsEvent.new(true, queue)) logger.elapsed_time 'jobnet total: ', (Time.now - @jobnet_start_time) logger.info "status all green" end |
#usage_exit(msg, usage) ⇒ Object
154 155 156 157 158 |
# File 'lib/bricolage/jobnetrunner.rb', line 154 def usage_exit(msg, usage) print_error msg $stderr.puts usage exit 1 end |