Class: GoodJob::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/scheduler.rb

Overview

Schedulers are generic thread pools that are responsible for periodically checking for available tasks, executing tasks within a thread, and efficiently scaling active threads.

Every scheduler has a single JobPerformer that will execute tasks. The scheduler is responsible for calling its performer efficiently across threads managed by an instance of Concurrent::ThreadPoolExecutor. If a performer does not have work, the thread will go to sleep. The scheduler maintains an instance of Concurrent::TimerTask, which wakes sleeping threads and causes them to check whether the performer has new work.

Defined Under Namespace

Classes: ThreadPoolExecutor, TimerSet

Constant Summary collapse

DEFAULT_EXECUTOR_OPTIONS =

Defaults for instance of Concurrent::ThreadPoolExecutor The thread pool executor is where work is performed.

{
  name: name,
  min_threads: 0,
  max_threads: Configuration::DEFAULT_MAX_THREADS,
  auto_terminate: true,
  idletime: 60,
  max_queue: Configuration::DEFAULT_MAX_THREADS,
  fallback_policy: :discard,
}.freeze

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false) ⇒ Scheduler

Returns a new instance of Scheduler.

Parameters:

  • performer (GoodJob::JobPerformer)
  • max_threads (Numeric, nil) (defaults to: nil)

    number of seconds between polls for jobs

  • max_cache (Numeric, nil) (defaults to: nil)

    maximum number of scheduled jobs to cache in memory

  • warm_cache_on_initialize (Boolean) (defaults to: false)

    whether to warm the cache immediately, or manually by calling warm_cache

Raises:

  • (ArgumentError)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/good_job/scheduler.rb', line 66

def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false)
  raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

  self.class.instances << self

  @performer = performer

  @max_cache = max_cache || 0
  @executor_options = DEFAULT_EXECUTOR_OPTIONS.dup
  if max_threads.present?
    @executor_options[:max_threads] = max_threads
    @executor_options[:max_queue] = max_threads
  end
  @executor_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})"

  create_executor
  warm_cache if warm_cache_on_initialize
end

Class Attribute Details

.instancesArray<GoodJob::Scheduler>? (readonly)

List of all instantiated Schedulers in the current process.

Returns:



35
# File 'lib/good_job/scheduler.rb', line 35

cattr_reader :instances, default: [], instance_reader: false

Class Method Details

.from_configuration(configuration, warm_cache_on_initialize: false) ⇒ GoodJob::Scheduler, GoodJob::MultiScheduler

Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.

Parameters:

Returns:



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/good_job/scheduler.rb', line 41

def self.from_configuration(configuration, warm_cache_on_initialize: false)
  schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads|
    queue_string, max_threads = queue_string_and_max_threads.split(':')
    max_threads = (max_threads || configuration.max_threads).to_i

    job_performer = GoodJob::JobPerformer.new(queue_string)
    GoodJob::Scheduler.new(
      job_performer,
      max_threads: max_threads,
      max_cache: configuration.max_cache,
      warm_cache_on_initialize: warm_cache_on_initialize
    )
  end

  if schedulers.size > 1
    GoodJob::MultiScheduler.new(schedulers)
  else
    schedulers.first
  end
end

Instance Method Details

#create_thread(state = nil) ⇒ Boolean?

Wakes a thread to allow the performer to execute a task.

Parameters:

  • state (Hash, nil) (defaults to: nil)

    Contextual information for the performer. See JobPerformer#next?.

Returns:

  • (Boolean, nil)

    Whether work was started.

    • nil if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity.

    • true if the performer started executing work.

    • false if the performer decides not to attempt to execute a task based on the state that is passed to it.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/good_job/scheduler.rb', line 138

def create_thread(state = nil)
  return nil unless executor.running?

  if state
    return false unless performer.next?(state)

    if state[:scheduled_at]
      scheduled_at = if state[:scheduled_at].is_a? String
                       Time.zone.parse state[:scheduled_at]
                     else
                       state[:scheduled_at]
                     end
      delay = [(scheduled_at - Time.current).to_f, 0].max
    end
  end

  delay ||= 0
  run_now = delay <= 0.01
  if run_now
    return nil unless executor.ready_worker_count.positive?
  elsif @max_cache.positive?
    return nil unless remaining_cache_count.positive?
  end

  create_task(delay)

  run_now ? true : nil
end

#restart(timeout: -1)) ⇒ void

This method returns an undefined value.

Restart the Scheduler. When shutdown, start; or shutdown and start.

Parameters:

  • timeout (nil, Numeric) (defaults to: -1))

    Seconds to wait for actively executing jobs to finish; shares same values as #shutdown.



123
124
125
126
127
128
129
# File 'lib/good_job/scheduler.rb', line 123

def restart(timeout: -1)
  instrument("scheduler_restart_pools") do
    shutdown(timeout: timeout) if running?
    create_executor
    warm_cache
  end
end

#running?Boolean?

Tests whether the scheduler is running.

Returns:

  • (Boolean, nil)


87
# File 'lib/good_job/scheduler.rb', line 87

delegate :running?, to: :executor, allow_nil: true

#shutdown(timeout: -1)) ⇒ void

This method returns an undefined value.

Shut down the scheduler. This stops all threads in the thread pool. Use #shutdown? to determine whether threads have stopped.

Parameters:

  • timeout (Numeric, nil) (defaults to: -1))

    Seconds to wait for actively executing jobs to finish

    • nil, the scheduler will trigger a shutdown but not wait for it to complete.

    • -1, the scheduler will wait until the shutdown is complete.

    • 0, the scheduler will immediately shutdown and stop any active tasks.

    • A positive number will wait that many seconds before stopping any remaining active tasks.



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/good_job/scheduler.rb', line 102

def shutdown(timeout: -1)
  return if executor.nil? || executor.shutdown?

  instrument("scheduler_shutdown_start", { timeout: timeout })
  instrument("scheduler_shutdown", { timeout: timeout }) do
    if executor.running?
      @timer_set.shutdown
      executor.shutdown
    end

    if executor.shuttingdown? && timeout
      executor_wait = timeout.negative? ? nil : timeout
      executor.kill unless executor.wait_for_termination(executor_wait)
    end
  end
end

#shutdown?Boolean?

Tests whether the scheduler is shutdown.

Returns:

  • (Boolean, nil)


91
# File 'lib/good_job/scheduler.rb', line 91

delegate :shutdown?, to: :executor, allow_nil: true

#statsHash

Information about the Scheduler

Returns:

  • (Hash)


180
181
182
183
184
185
186
187
188
189
190
# File 'lib/good_job/scheduler.rb', line 180

def stats
  {
    name: performer.name,
    max_threads: @executor_options[:max_threads],
    active_threads: @executor_options[:max_threads] - executor.ready_worker_count,
    available_threads: executor.ready_worker_count,
    max_cache: @max_cache,
    active_cache: cache_count,
    available_cache: remaining_cache_count,
  }
end

#warm_cachevoid

This method returns an undefined value.

Preload existing runnable and future-scheduled jobs



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/good_job/scheduler.rb', line 194

def warm_cache
  return if @max_cache.zero?

  future = Concurrent::Future.new(args: [self, @performer], executor: executor) do |thr_scheduler, thr_performer|
    Rails.application.executor.wrap do
      thr_performer.next_at(
        limit: @max_cache,
        now_limit: @executor_options[:max_threads]
      ).each do |scheduled_at|
        thr_scheduler.create_thread({ scheduled_at: scheduled_at })
      end
    end
  end

  observer = lambda do |_time, _output, thread_error|
    GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
    create_task # If cache-warming exhausts the threads, ensure there isn't an executable task remaining
  end
  future.add_observer(observer, :call)

  future.execute
end