Class: GoodJob::Scheduler
- Inherits:
-
Object
- Object
- GoodJob::Scheduler
- Defined in:
- lib/good_job/scheduler.rb
Overview
Schedulers are generic thread pools that are responsible for periodically checking for available tasks, executing tasks within a thread, and efficiently scaling active threads.
Every scheduler has a single JobPerformer that will execute tasks. The scheduler is responsible for calling its performer efficiently across threads managed by an instance of Concurrent::ThreadPoolExecutor. If a performer does not have work, the thread will go to sleep. The scheduler maintains an instance of Concurrent::TimerTask, which wakes sleeping threads and causes them to check whether the performer has new work.
Defined Under Namespace
Classes: ThreadPoolExecutor, TimerSet
Constant Summary collapse
- DEFAULT_EXECUTOR_OPTIONS =
Defaults for instance of Concurrent::ThreadPoolExecutor The thread pool executor is where work is performed.
{ name: name, min_threads: 0, max_threads: Configuration::DEFAULT_MAX_THREADS, auto_terminate: true, idletime: 60, max_queue: Configuration::DEFAULT_MAX_THREADS, fallback_policy: :discard, }.freeze
Class Attribute Summary collapse
-
.instances ⇒ Array<GoodJob::Scheduler>?
readonly
List of all instantiated Schedulers in the current process.
Class Method Summary collapse
-
.from_configuration(configuration, warm_cache_on_initialize: false) ⇒ GoodJob::Scheduler, GoodJob::MultiScheduler
Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
Instance Method Summary collapse
-
#create_thread(state = nil) ⇒ Boolean?
Wakes a thread to allow the performer to execute a task.
-
#initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#restart(timeout: -1)) ⇒ void
Restart the Scheduler.
-
#running? ⇒ Boolean?
Tests whether the scheduler is running.
-
#shutdown(timeout: -1)) ⇒ void
Shut down the scheduler.
-
#shutdown? ⇒ Boolean?
Tests whether the scheduler is shutdown.
-
#stats ⇒ Hash
Information about the Scheduler.
-
#warm_cache ⇒ void
Preload existing runnable and future-scheduled jobs.
Constructor Details
#initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false) ⇒ Scheduler
Returns a new instance of Scheduler.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/good_job/scheduler.rb', line 66 def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) self.class.instances << self @performer = performer @max_cache = max_cache || 0 @executor_options = DEFAULT_EXECUTOR_OPTIONS.dup if max_threads.present? @executor_options[:max_threads] = max_threads @executor_options[:max_queue] = max_threads end @executor_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})" create_executor warm_cache if warm_cache_on_initialize end |
Class Attribute Details
.instances ⇒ Array<GoodJob::Scheduler>? (readonly)
List of all instantiated Schedulers in the current process.
35 |
# File 'lib/good_job/scheduler.rb', line 35 cattr_reader :instances, default: [], instance_reader: false |
Class Method Details
.from_configuration(configuration, warm_cache_on_initialize: false) ⇒ GoodJob::Scheduler, GoodJob::MultiScheduler
Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/good_job/scheduler.rb', line 41 def self.from_configuration(configuration, warm_cache_on_initialize: false) schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads| queue_string, max_threads = queue_string_and_max_threads.split(':') max_threads = (max_threads || configuration.max_threads).to_i job_performer = GoodJob::JobPerformer.new(queue_string) GoodJob::Scheduler.new( job_performer, max_threads: max_threads, max_cache: configuration.max_cache, warm_cache_on_initialize: warm_cache_on_initialize ) end if schedulers.size > 1 GoodJob::MultiScheduler.new(schedulers) else schedulers.first end end |
Instance Method Details
#create_thread(state = nil) ⇒ Boolean?
Wakes a thread to allow the performer to execute a task.
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/good_job/scheduler.rb', line 138 def create_thread(state = nil) return nil unless executor.running? if state return false unless performer.next?(state) if state[:scheduled_at] scheduled_at = if state[:scheduled_at].is_a? String Time.zone.parse state[:scheduled_at] else state[:scheduled_at] end delay = [(scheduled_at - Time.current).to_f, 0].max end end delay ||= 0 run_now = delay <= 0.01 if run_now return nil unless executor.ready_worker_count.positive? elsif @max_cache.positive? return nil unless remaining_cache_count.positive? end create_task(delay) run_now ? true : nil end |
#restart(timeout: -1)) ⇒ void
This method returns an undefined value.
Restart the Scheduler. When shutdown, start; or shutdown and start.
123 124 125 126 127 128 129 |
# File 'lib/good_job/scheduler.rb', line 123 def restart(timeout: -1) instrument("scheduler_restart_pools") do shutdown(timeout: timeout) if running? create_executor warm_cache end end |
#running? ⇒ Boolean?
Tests whether the scheduler is running.
87 |
# File 'lib/good_job/scheduler.rb', line 87 delegate :running?, to: :executor, allow_nil: true |
#shutdown(timeout: -1)) ⇒ void
This method returns an undefined value.
Shut down the scheduler. This stops all threads in the thread pool. Use #shutdown? to determine whether threads have stopped.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/good_job/scheduler.rb', line 102 def shutdown(timeout: -1) return if executor.nil? || executor.shutdown? instrument("scheduler_shutdown_start", { timeout: timeout }) instrument("scheduler_shutdown", { timeout: timeout }) do if executor.running? @timer_set.shutdown executor.shutdown end if executor.shuttingdown? && timeout executor_wait = timeout.negative? ? nil : timeout executor.kill unless executor.wait_for_termination(executor_wait) end end end |
#shutdown? ⇒ Boolean?
Tests whether the scheduler is shutdown.
91 |
# File 'lib/good_job/scheduler.rb', line 91 delegate :shutdown?, to: :executor, allow_nil: true |
#stats ⇒ Hash
Information about the Scheduler
180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/good_job/scheduler.rb', line 180 def stats { name: performer.name, max_threads: @executor_options[:max_threads], active_threads: @executor_options[:max_threads] - executor.ready_worker_count, available_threads: executor.ready_worker_count, max_cache: @max_cache, active_cache: cache_count, available_cache: remaining_cache_count, } end |
#warm_cache ⇒ void
This method returns an undefined value.
Preload existing runnable and future-scheduled jobs
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/good_job/scheduler.rb', line 194 def warm_cache return if @max_cache.zero? future = Concurrent::Future.new(args: [self, @performer], executor: executor) do |thr_scheduler, thr_performer| Rails.application.executor.wrap do thr_performer.next_at( limit: @max_cache, now_limit: @executor_options[:max_threads] ).each do |scheduled_at| thr_scheduler.create_thread({ scheduled_at: scheduled_at }) end end end observer = lambda do |_time, _output, thread_error| GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call) create_task # If cache-warming exhausts the threads, ensure there isn't an executable task remaining end future.add_observer(observer, :call) future.execute end |