Class: Scheduler::MainProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/scheduler/main_process.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeScheduler::MainProcess

Creates a MainProcess which keeps running and continuously checks if new jobs are queued.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/scheduler/main_process.rb', line 20

def initialize
  @pid = Process.pid
  @logger = Scheduler.configuration.logger
  @job_class = Scheduler.configuration.job_class
  @polling_interval = Scheduler.configuration.polling_interval
  @max_concurrent_jobs = Scheduler.configuration.max_concurrent_jobs

  unless @logger.instance_of?(ActiveSupport::Logger) or @logger.instance_of?(Logger)
    @logger = Logger.new(@logger)
  end

  if @polling_interval < 1
    @logger.warn "[Scheduler:#{@pid}] Warning: specified a polling interval lesser than 1: "\
      "it will be forced to 1.".yellow
    @polling_interval = 1
  end
  
  unless @job_class.included_modules.include? Scheduler::Schedulable
    raise "The given job class '#{@job_class}' is not a Schedulable class. "\
      "Make sure to add 'include Scheduler::Schedulable' to your class."
  end
  
  @logger.info "[Scheduler:#{@pid}] Starting scheduler..".cyan
  self.start_loop
end

Instance Attribute Details

#job_classClass

Returns the class of the main job model.

Returns:

  • (Class)

    the class of the main job model.



9
10
11
# File 'lib/scheduler/main_process.rb', line 9

def job_class
  @job_class
end

#loggerString

Returns a logger file.

Returns:

  • (String)

    a logger file.



7
8
9
# File 'lib/scheduler/main_process.rb', line 7

def logger
  @logger
end

#max_concurrent_jobsInteger

Returns maximum number of concurent jobs.

Returns:

  • (Integer)

    maximum number of concurent jobs.



13
14
15
# File 'lib/scheduler/main_process.rb', line 13

def max_concurrent_jobs
  @max_concurrent_jobs
end

#pidInteger

Returns pid of the main process.

Returns:

  • (Integer)

    pid of the main process.



5
6
7
# File 'lib/scheduler/main_process.rb', line 5

def pid
  @pid
end

#polling_intervalInteger

Returns how much time to wait before each iteration.

Returns:

  • (Integer)

    how much time to wait before each iteration.



11
12
13
# File 'lib/scheduler/main_process.rb', line 11

def polling_interval
  @polling_interval
end

Instance Method Details

#reschedule_running_jobsnil

Reschedules currently running jobs.

Returns:

  • (nil)


124
125
126
127
128
129
130
131
132
133
# File 'lib/scheduler/main_process.rb', line 124

def reschedule_running_jobs
  @job_class.running.each do |job|
    begin
      Process.kill :QUIT, job.pid if job.pid.present?
    rescue Errno::ESRCH, Errno::EPERM
    ensure
      job.schedule
    end
  end
end

#start_loopnil

Main loop.

Returns:

  • (nil)


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/scheduler/main_process.rb', line 50

def start_loop
  loop do
    begin
      # Loads up a job queue.
      queue = []
      
      # Counts jobs to schedule.
      running_jobs = @job_class.running.entries
      schedulable_jobs = @job_class.queued.order_by(scheduled_at: :asc).entries
      jobs_to_schedule = @max_concurrent_jobs - running_jobs.count
      jobs_to_schedule = 0 if jobs_to_schedule < 0
  
      # Finds out scheduled jobs waiting to be performed.
      scheduled_jobs = []
      schedulable_jobs.first(jobs_to_schedule).each do |job|
        job_pid = Process.fork do
          begin
            job.perform_now
          rescue StandardError => e
            @logger.error "[Scheduler:#{@pid}] Error #{e.class}: #{e.message} "\
              "(#{e.backtrace.select { |l| l.include?('app') }.first}).".red
          end
        end
        Process.detach(job_pid)
        job.update_attribute(:pid, job_pid)
        scheduled_jobs << job
        queue << job.id.to_s
      end
  
      # Logs launched jobs
      if scheduled_jobs.any?
        @logger.info "[Scheduler:#{@pid}] Launched #{scheduled_jobs.count} "\
          "jobs: #{scheduled_jobs.map(&:id).map(&:to_s).join(', ')}.".cyan
      else
        if schedulable_jobs.count == 0
          @logger.info "[Scheduler:#{@pid}] No jobs in queue.".cyan
        else
          @logger.warn "[Scheduler:#{@pid}] No jobs launched, reached maximum "\
            "number of concurrent jobs. Jobs in queue: #{schedulable_jobs.count}.".yellow
        end
      end
  
      # Checks for completed jobs: clears up queue and kills any zombie pid
      queue.delete_if do |job_id|
        job = @job_class.find(job_id)
        if job.present? and job.status.in? [ :completed, :error ]
          begin
            @logger.info "[Scheduler:#{@pid}] Rimosso processo #{job.pid} per lavoro completato".cyan
            Process.kill :QUIT, job.pid
          rescue Errno::ENOENT, Errno::ESRCH
          end
          true
        else false end
      end
  
      # Waits the specified amount of time before next iteration
      sleep @polling_interval
    rescue StandardError => error
      @logger.error "[Scheduler:#{@pid}] Error #{error.message}".red
      @logger.error error.backtrace.select { |line| line.include?('app') }.join("\n").red
    rescue SignalException => signal
      if signal.message.in? [ 'SIGINT', 'SIGTERM', 'SIGQUIT' ]
        @logger.warn "[Scheduler:#{@pid}] Received interrupt, terminating scheduler..".yellow
        reschedule_running_jobs
        break
      end
    end
  end
end