Class: Scheduler::MainProcess
- Inherits:
-
Object
- Object
- Scheduler::MainProcess
- Defined in:
- lib/scheduler/main_process.rb
Instance Attribute Summary collapse
-
#job_class ⇒ Class
The class of the main job model.
-
#logger ⇒ String
A logger file.
-
#max_concurrent_jobs ⇒ Integer
Maximum number of concurent jobs.
-
#pid ⇒ Integer
Pid of the main process.
-
#polling_interval ⇒ Integer
How much time to wait before each iteration.
Instance Method Summary collapse
-
#initialize ⇒ Scheduler::MainProcess
constructor
Creates a MainProcess which keeps running and continuously checks if new jobs are queued.
-
#reschedule_running_jobs ⇒ nil
Reschedules currently running jobs.
-
#start_loop ⇒ nil
Main loop.
Constructor Details
#initialize ⇒ Scheduler::MainProcess
Creates a MainProcess which keeps running and continuously checks if new jobs are queued.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/scheduler/main_process.rb', line 20 def initialize @pid = Process.pid @logger = Scheduler.configuration.logger @job_class = Scheduler.configuration.job_class @polling_interval = Scheduler.configuration.polling_interval @max_concurrent_jobs = Scheduler.configuration.max_concurrent_jobs unless @logger.instance_of?(ActiveSupport::Logger) or @logger.instance_of?(Logger) @logger = Logger.new(@logger) end if @polling_interval < 1 @logger.warn "[Scheduler:#{@pid}] Warning: specified a polling interval lesser than 1: "\ "it will be forced to 1.".yellow @polling_interval = 1 end unless @job_class.included_modules.include? Scheduler::Schedulable raise "The given job class '#{@job_class}' is not a Schedulable class. "\ "Make sure to add 'include Scheduler::Schedulable' to your class." end @logger.info "[Scheduler:#{@pid}] Starting scheduler..".cyan self.start_loop end |
Instance Attribute Details
#job_class ⇒ Class
Returns the class of the main job model.
9 10 11 |
# File 'lib/scheduler/main_process.rb', line 9 def job_class @job_class end |
#logger ⇒ String
Returns a logger file.
7 8 9 |
# File 'lib/scheduler/main_process.rb', line 7 def logger @logger end |
#max_concurrent_jobs ⇒ Integer
Returns maximum number of concurent jobs.
13 14 15 |
# File 'lib/scheduler/main_process.rb', line 13 def max_concurrent_jobs @max_concurrent_jobs end |
#pid ⇒ Integer
Returns pid of the main process.
5 6 7 |
# File 'lib/scheduler/main_process.rb', line 5 def pid @pid end |
#polling_interval ⇒ Integer
Returns how much time to wait before each iteration.
11 12 13 |
# File 'lib/scheduler/main_process.rb', line 11 def polling_interval @polling_interval end |
Instance Method Details
#reschedule_running_jobs ⇒ nil
Reschedules currently running jobs.
124 125 126 127 128 129 130 131 132 133 |
# File 'lib/scheduler/main_process.rb', line 124 def reschedule_running_jobs @job_class.running.each do |job| begin Process.kill :QUIT, job.pid if job.pid.present? rescue Errno::ESRCH, Errno::EPERM ensure job.schedule end end end |
#start_loop ⇒ nil
Main loop.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/scheduler/main_process.rb', line 50 def start_loop loop do begin # Loads up a job queue. queue = [] # Counts jobs to schedule. running_jobs = @job_class.running.entries schedulable_jobs = @job_class.queued.order_by(scheduled_at: :asc).entries jobs_to_schedule = @max_concurrent_jobs - running_jobs.count jobs_to_schedule = 0 if jobs_to_schedule < 0 # Finds out scheduled jobs waiting to be performed. scheduled_jobs = [] schedulable_jobs.first(jobs_to_schedule).each do |job| job_pid = Process.fork do begin job.perform_now rescue StandardError => e @logger.error "[Scheduler:#{@pid}] Error #{e.class}: #{e.} "\ "(#{e.backtrace.select { |l| l.include?('app') }.first}).".red end end Process.detach(job_pid) job.update_attribute(:pid, job_pid) scheduled_jobs << job queue << job.id.to_s end # Logs launched jobs if scheduled_jobs.any? @logger.info "[Scheduler:#{@pid}] Launched #{scheduled_jobs.count} "\ "jobs: #{scheduled_jobs.map(&:id).map(&:to_s).join(', ')}.".cyan else if schedulable_jobs.count == 0 @logger.info "[Scheduler:#{@pid}] No jobs in queue.".cyan else @logger.warn "[Scheduler:#{@pid}] No jobs launched, reached maximum "\ "number of concurrent jobs. Jobs in queue: #{schedulable_jobs.count}.".yellow end end # Checks for completed jobs: clears up queue and kills any zombie pid queue.delete_if do |job_id| job = @job_class.find(job_id) if job.present? and job.status.in? [ :completed, :error ] begin @logger.info "[Scheduler:#{@pid}] Rimosso processo #{job.pid} per lavoro completato".cyan Process.kill :QUIT, job.pid rescue Errno::ENOENT, Errno::ESRCH end true else false end end # Waits the specified amount of time before next iteration sleep @polling_interval rescue StandardError => error @logger.error "[Scheduler:#{@pid}] Error #{error.}".red @logger.error error.backtrace.select { |line| line.include?('app') }.join("\n").red rescue SignalException => signal if signal..in? [ 'SIGINT', 'SIGTERM', 'SIGQUIT' ] @logger.warn "[Scheduler:#{@pid}] Received interrupt, terminating scheduler..".yellow reschedule_running_jobs break end end end end |