Class: GoodJob::Scheduler
- Inherits:
-
Object
- Object
- GoodJob::Scheduler
- Defined in:
- lib/good_job/scheduler.rb
Overview
Schedulers are generic thread pools that are responsible for periodically checking for available tasks, executing tasks within a thread, and efficiently scaling active threads.
Every scheduler has a single Performer that will execute tasks. The scheduler is responsible for calling its performer efficiently across threads managed by an instance of Concurrent::ThreadPoolExecutor. If a performer does not have work, the thread will go to sleep. The scheduler maintains an instance of Concurrent::TimerTask, which wakes sleeping threads and causes them to check whether the performer has new work.
Defined Under Namespace
Classes: ThreadPoolExecutor
Constant Summary collapse
- DEFAULT_POOL_OPTIONS =
Defaults for instance of Concurrent::ThreadPoolExecutor The thread pool is where work is performed.
{ name: name, min_threads: 0, max_threads: Configuration::DEFAULT_MAX_THREADS, auto_terminate: true, idletime: 60, max_queue: -1, fallback_policy: :discard, }.freeze
Class Attribute Summary collapse
-
.instances ⇒ array<GoodJob:Scheduler>
readonly
List of all instantiated Schedulers in the current process.
Class Method Summary collapse
-
.from_configuration(configuration) ⇒ GoodJob::Scheduler, GoodJob::MultiScheduler
Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
Instance Method Summary collapse
-
#create_thread(state = nil) ⇒ nil, Boolean
Wakes a thread to allow the performer to execute a task.
-
#initialize(performer, max_threads: nil) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#restart(wait: true) ⇒ void
Restart the Scheduler.
-
#shutdown(wait: true) ⇒ void
Shut down the scheduler.
-
#shutdown? ⇒ true, ...
Tests whether the scheduler is shutdown.
Constructor Details
#initialize(performer, max_threads: nil) ⇒ Scheduler
Returns a new instance of Scheduler.
68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/good_job/scheduler.rb', line 68 def initialize(performer, max_threads: nil) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) self.class.instances << self @performer = performer = DEFAULT_POOL_OPTIONS.dup [:max_threads] = max_threads if max_threads.present? [:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]})" create_pool end |
Class Attribute Details
.instances ⇒ array<GoodJob:Scheduler> (readonly)
List of all instantiated Schedulers in the current process.
33 |
# File 'lib/good_job/scheduler.rb', line 33 cattr_reader :instances, default: [], instance_reader: false |
Class Method Details
.from_configuration(configuration) ⇒ GoodJob::Scheduler, GoodJob::MultiScheduler
Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/good_job/scheduler.rb', line 38 def self.from_configuration(configuration) schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads| queue_string, max_threads = queue_string_and_max_threads.split(':') max_threads = (max_threads || configuration.max_threads).to_i job_query = GoodJob::Job.queue_string(queue_string) parsed = GoodJob::Job.queue_parser(queue_string) job_filter = proc do |state| if parsed[:exclude] parsed[:exclude].exclude?(state[:queue_name]) elsif parsed[:include] parsed[:include].include? state[:queue_name] else true end end job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter) GoodJob::Scheduler.new(job_performer, max_threads: max_threads) end if schedulers.size > 1 GoodJob::MultiScheduler.new(schedulers) else schedulers.first end end |
Instance Method Details
#create_thread(state = nil) ⇒ nil, Boolean
Wakes a thread to allow the performer to execute a task.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/good_job/scheduler.rb', line 123 def create_thread(state = nil) return nil unless @pool.running? && @pool.ready_worker_count.positive? return false if state && !@performer.next?(state) future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer| output = nil Rails.application.executor.wrap { output = performer.next } output end future.add_observer(self, :task_observer) future.execute true end |
#restart(wait: true) ⇒ void
This method returns an undefined value.
Restart the Scheduler. When shutdown, start; or shutdown and start.
110 111 112 113 114 115 |
# File 'lib/good_job/scheduler.rb', line 110 def restart(wait: true) instrument("scheduler_restart_pools") do shutdown(wait: wait) unless shutdown? create_pool end end |
#shutdown(wait: true) ⇒ void
This method returns an undefined value.
Shut down the scheduler. This stops all threads in the pool. If wait is true, the scheduler will wait for any active tasks to finish. If wait is false, this method will return immediately even though threads may still be running. Use #shutdown? to determine whether threads have stopped.
89 90 91 92 93 94 95 96 97 98 |
# File 'lib/good_job/scheduler.rb', line 89 def shutdown(wait: true) return unless @pool&.running? instrument("scheduler_shutdown_start", { wait: wait }) instrument("scheduler_shutdown", { wait: wait }) do @pool.shutdown @pool.wait_for_termination if wait # TODO: Should be killed if wait is not true end end |
#shutdown? ⇒ true, ...
Tests whether the scheduler is shutdown.
102 103 104 |
# File 'lib/good_job/scheduler.rb', line 102 def shutdown? !@pool&.running? end |