Module: Resque::Scheduler
- Extended by:
- Configuration, Locking, SignalHandling
- Defined in:
- lib/resque/scheduler.rb,
lib/resque/scheduler/cli.rb,
lib/resque/scheduler/env.rb,
lib/resque/scheduler/util.rb,
lib/resque/scheduler/plugin.rb,
lib/resque/scheduler/server.rb,
lib/resque/scheduler/locking.rb,
lib/resque/scheduler/version.rb,
lib/resque/scheduler/extension.rb,
lib/resque/scheduler/lock/base.rb,
lib/resque/scheduler/lock/basic.rb,
lib/resque/scheduler/configuration.rb,
lib/resque/scheduler/lock/resilient.rb,
lib/resque/scheduler/logger_builder.rb,
lib/resque/scheduler/failure_handler.rb,
lib/resque/scheduler/signal_handling.rb,
lib/resque/scheduler/delaying_extensions.rb,
lib/resque/scheduler/scheduling_extensions.rb
Defined Under Namespace
Modules: Configuration, DelayingExtensions, Extension, Lock, Locking, Plugin, SchedulingExtensions, Server, SignalHandling Classes: Cli, Env, FailureHandler, LoggerBuilder, Util
Constant Summary collapse
- CLI_OPTIONS_ENV_MAPPING =
{ app_name: 'APP_NAME', background: 'BACKGROUND', dynamic: 'DYNAMIC_SCHEDULE', env: 'RAILS_ENV', initializer_path: 'INITIALIZER_PATH', logfile: 'LOGFILE', logformat: 'LOGFORMAT', quiet: 'QUIET', pidfile: 'PIDFILE', poll_sleep_amount: 'RESQUE_SCHEDULER_INTERVAL', verbose: 'VERBOSE' }
- VERSION =
'4.2.0'
Class Attribute Summary collapse
- .failure_handler ⇒ Object
- .logger ⇒ Object
-
.scheduled_jobs ⇒ Object
readonly
the Rufus::Scheduler jobs that are scheduled.
Attributes included from Configuration
#app_name, #dynamic, #env, #logfile, #logformat, #poll_sleep_amount, #quiet, #verbose
Attributes included from SignalHandling
Class Method Summary collapse
- .before_shutdown ⇒ Object
-
.clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one.
- .enqueue(config) ⇒ Object
-
.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object
Enqueues all delayed jobs for a timestamp.
-
.enqueue_from_config(job_config) ⇒ Object
Enqueues a job based on a config hash.
- .enqueue_next_item(timestamp) ⇒ Object
-
.env_matches?(configured_env) ⇒ Boolean
Returns true if the current env is non-nil and the configured env (which is a comma-split string) includes the current env.
-
.handle_delayed_items(at_time = nil) ⇒ Object
Handles queueing delayed items at_time - Time to start scheduling items (default: now).
- .handle_shutdown ⇒ Object
- .handle_signals_with_operation ⇒ Object
-
.load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance.
-
.load_schedule_job(name, config) ⇒ Object
Loads a job schedule into the Rufus::Scheduler and stores it in @scheduled_jobs.
- .log(msg) ⇒ Object
- .log!(msg) ⇒ Object
- .log_error(msg) ⇒ Object
-
.optionizate_interval_value(value) ⇒ Object
modify interval type value to value with options if options available.
-
.poll_sleep ⇒ Object
Sleeps and returns true.
- .poll_sleep_loop ⇒ Object
- .print_schedule ⇒ Object
- .procline(string) ⇒ Object
-
.rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current env.
- .reload_schedule! ⇒ Object
- .rufus_scheduler ⇒ Object
-
.run ⇒ Object
Schedule all jobs and continually look for delayed jobs (never returns).
-
.shutdown ⇒ Object
Sets the shutdown flag, clean schedules and exits if sleeping.
- .unschedule_job(name) ⇒ Object
- .update_schedule ⇒ Object
Methods included from Locking
master?, master_lock, release_master_lock, release_master_lock!, supports_lua?
Methods included from Configuration
Methods included from SignalHandling
handle_signals, register_signal_handlers
Class Attribute Details
.failure_handler ⇒ Object
404 405 406 |
# File 'lib/resque/scheduler.rb', line 404 def failure_handler @failure_handler ||= Resque::Scheduler::FailureHandler end |
.logger ⇒ Object
408 409 410 411 412 413 414 415 |
# File 'lib/resque/scheduler.rb', line 408 def logger @logger ||= Resque::Scheduler::LoggerBuilder.new( quiet: quiet, verbose: verbose, log_dev: logfile, format: logformat ).build end |
.scheduled_jobs ⇒ Object (readonly)
the Rufus::Scheduler jobs that are scheduled
29 30 31 |
# File 'lib/resque/scheduler.rb', line 29 def scheduled_jobs @scheduled_jobs end |
Class Method Details
.before_shutdown ⇒ Object
373 374 375 |
# File 'lib/resque/scheduler.rb', line 373 def before_shutdown release_master_lock end |
.clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler
291 292 293 294 295 296 |
# File 'lib/resque/scheduler.rb', line 291 def clear_schedule! rufus_scheduler.stop @rufus_scheduler = nil @scheduled_jobs = {} rufus_scheduler end |
.enqueue(config) ⇒ Object
216 217 218 219 220 |
# File 'lib/resque/scheduler.rb', line 216 def enqueue(config) enqueue_from_config(config) rescue => e Resque::Scheduler.failure_handler.on_enqueue_failure(config, e) end |
.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object
Enqueues all delayed jobs for a timestamp
203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/resque/scheduler.rb', line 203 def () item = nil loop do handle_shutdown do # Continually check that it is still the master item = enqueue_next_item() if master? end # continue processing until there are no more ready items in this # timestamp break if item.nil? end end |
.enqueue_from_config(job_config) ⇒ Object
Enqueues a job based on a config hash
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/resque/scheduler.rb', line 229 def enqueue_from_config(job_config) args = job_config['args'] || job_config[:args] klass_name = job_config['class'] || job_config[:class] begin klass = Resque::Scheduler::Util.constantize(klass_name) rescue NameError klass = klass_name end params = args.is_a?(Hash) ? [args] : Array(args) queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(klass) # Support custom job classes like those that inherit from # Resque::JobWithStatus (resque-status) job_klass = job_config['custom_job_class'] if job_klass && job_klass != 'Resque::Job' # The custom job class API must offer a static "scheduled" method. If # the custom job class can not be constantized (via a requeue call # from the web perhaps), fall back to enqueing normally via # Resque::Job.create. begin Resque::Scheduler::Util.constantize(job_klass).scheduled( queue, klass_name, *params ) rescue NameError # Note that the custom job class (job_config['custom_job_class']) # is the one enqueued Resque::Job.create(queue, job_klass, *params) end else # Hack to avoid havoc for people shoving stuff into queues # for non-existent classes (for example: running scheduler in # one app that schedules for another. if Class === klass Resque::Scheduler::Plugin.run_before_delayed_enqueue_hooks( klass, *params ) # If the class is a custom job class, call self#scheduled on it. # This allows you to do things like Resque.enqueue_at(timestamp, # CustomJobClass). Otherwise, pass off to Resque. if klass.respond_to?(:scheduled) klass.scheduled(queue, klass_name, *params) else Resque.enqueue_to(queue, klass, *params) end else # This will not run the before_hooks in rescue, but will at least # queue the job. Resque::Job.create(queue, klass, *params) end end end |
.enqueue_next_item(timestamp) ⇒ Object
191 192 193 194 195 196 197 198 199 200 |
# File 'lib/resque/scheduler.rb', line 191 def enqueue_next_item() item = Resque.() if item log "queuing #{item['class']} [delayed]" enqueue(item) end item end |
.env_matches?(configured_env) ⇒ Boolean
Returns true if the current env is non-nil and the configured env (which is a comma-split string) includes the current env.
174 175 176 |
# File 'lib/resque/scheduler.rb', line 174 def env_matches?(configured_env) env && configured_env.split(/[\s,]+/).include?(env) end |
.handle_delayed_items(at_time = nil) ⇒ Object
Handles queueing delayed items at_time - Time to start scheduling items (default: now).
180 181 182 183 184 185 186 187 188 189 |
# File 'lib/resque/scheduler.rb', line 180 def handle_delayed_items(at_time = nil) = Resque.(at_time) if procline 'Processing Delayed Items' until .nil? () = Resque.(at_time) end end end |
.handle_shutdown ⇒ Object
222 223 224 225 226 |
# File 'lib/resque/scheduler.rb', line 222 def handle_shutdown exit if @shutdown yield exit if @shutdown end |
.handle_signals_with_operation ⇒ Object
364 365 366 367 368 369 370 371 |
# File 'lib/resque/scheduler.rb', line 364 def handle_signals_with_operation yield if block_given? handle_signals false rescue Interrupt before_shutdown if @shutdown true end |
.load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/resque/scheduler.rb', line 88 def load_schedule! procline 'Loading Schedule' # Need to load the schedule from redis for the first time if dynamic Resque.reload_schedule! if dynamic log! 'Schedule empty! Set Resque.schedule' if Resque.schedule.empty? @scheduled_jobs = {} Resque.schedule.each do |name, config| load_schedule_job(name, config) end Resque.redis.del(:schedules_changed) procline 'Schedules Loaded' end |
.load_schedule_job(name, config) ⇒ Object
Loads a job schedule into the Rufus::Scheduler and stores it in @scheduled_jobs
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/resque/scheduler.rb', line 125 def load_schedule_job(name, config) # If `rails_env` or `env` is set in the config, load jobs only if they # are meant to be loaded in `Resque::Scheduler.env`. If `rails_env` or # `env` is missing, the job should be scheduled regardless of the value # of `Resque::Scheduler.env`. configured_env = config['rails_env'] || config['env'] if configured_env.nil? || env_matches?(configured_env) log! "Scheduling #{name} " interval_defined = false interval_types = %w(cron every) interval_types.each do |interval_type| next unless !config[interval_type].nil? && config[interval_type].length > 0 args = optionizate_interval_value(config[interval_type]) args = [args, nil, job: true] if args.is_a?(::String) job = rufus_scheduler.send(interval_type, *args) do if master? log! "queueing #{config['class']} (#{name})" Resque.last_enqueued_at(name, Time.now.to_s) enqueue(config) end end @scheduled_jobs[name] = job interval_defined = true break end unless interval_defined log! "no #{interval_types.join(' / ')} found for " \ "#{config['class']} (#{name}) - skipping" end else log "Skipping schedule of #{name} because configured " \ "env #{configured_env.inspect} does not match current " \ "env #{env.inspect}" end end |
.log(msg) ⇒ Object
393 394 395 |
# File 'lib/resque/scheduler.rb', line 393 def log(msg) logger.debug { msg } end |
.log!(msg) ⇒ Object
385 386 387 |
# File 'lib/resque/scheduler.rb', line 385 def log!(msg) logger.info { msg } end |
.log_error(msg) ⇒ Object
389 390 391 |
# File 'lib/resque/scheduler.rb', line 389 def log_error(msg) logger.error { msg } end |
.optionizate_interval_value(value) ⇒ Object
modify interval type value to value with options if options available
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/resque/scheduler.rb', line 106 def optionizate_interval_value(value) args = value if args.is_a?(::Array) return args.first if args.size > 2 || !args.last.is_a?(::Hash) # symbolize keys of hash for options args[2] = args[1].reduce({}) do |m, i| key, value = i m[(key.respond_to?(:to_sym) ? key.to_sym : key) || key] = value m end args[2][:job] = true args[1] = nil end args end |
.poll_sleep ⇒ Object
Sleeps and returns true
331 332 333 334 335 336 337 338 339 340 |
# File 'lib/resque/scheduler.rb', line 331 def poll_sleep handle_shutdown do begin poll_sleep_loop ensure @sleeping = false end end true end |
.poll_sleep_loop ⇒ Object
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/resque/scheduler.rb', line 342 def poll_sleep_loop @sleeping = true if poll_sleep_amount > 0 start = Time.now loop do elapsed_sleep = (Time.now - start) remaining_sleep = poll_sleep_amount - elapsed_sleep @do_break = false if remaining_sleep <= 0 @do_break = true else @do_break = handle_signals_with_operation do sleep(remaining_sleep) end end break if @do_break end else handle_signals_with_operation end end |
.print_schedule ⇒ Object
76 77 78 79 80 81 82 83 84 |
# File 'lib/resque/scheduler.rb', line 76 def print_schedule if rufus_scheduler log! "Scheduling Info\tLast Run" scheduler_jobs = rufus_scheduler.jobs scheduler_jobs.each do |_k, v| log! "#{v.t}\t#{v.last}\t" end end end |
.procline(string) ⇒ Object
397 398 399 400 401 402 |
# File 'lib/resque/scheduler.rb', line 397 def procline(string) log! string argv0 = build_procline(string) log "Setting procline #{argv0.inspect}" $0 = argv0 end |
.rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current env
165 166 167 168 169 170 |
# File 'lib/resque/scheduler.rb', line 165 def rails_env_matches?(config) warn '`Resque::Scheduler.rails_env_matches?` is deprecated. ' \ 'Please use `Resque::Scheduler.env_matches?` instead.' config['rails_env'] && env && config['rails_env'].split(/[\s,]+/).include?(env) end |
.reload_schedule! ⇒ Object
298 299 300 301 302 |
# File 'lib/resque/scheduler.rb', line 298 def reload_schedule! procline 'Reloading Schedule' clear_schedule! load_schedule! end |
.rufus_scheduler ⇒ Object
285 286 287 |
# File 'lib/resque/scheduler.rb', line 285 def rufus_scheduler @rufus_scheduler ||= Rufus::Scheduler.new end |
.run ⇒ Object
Schedule all jobs and continually look for delayed jobs (never returns)
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/resque/scheduler.rb', line 35 def run procline 'Starting' # trap signals register_signal_handlers # Quote from the resque/worker. # Fix buffering so we can `rake resque:scheduler > scheduler.log` and # get output from the child in there. $stdout.sync = true $stderr.sync = true # Load the schedule into rufus # If dynamic is set, load that schedule otherwise use normal load if dynamic reload_schedule! else load_schedule! end begin @th = Thread.current # Now start the scheduling part of the loop. loop do if master? begin handle_delayed_items update_schedule if dynamic rescue Errno::EAGAIN, Errno::ECONNRESET => e log! e. end end poll_sleep end rescue Interrupt log 'Exiting' end end |
.shutdown ⇒ Object
Sets the shutdown flag, clean schedules and exits if sleeping
378 379 380 381 382 383 |
# File 'lib/resque/scheduler.rb', line 378 def shutdown return if @shutdown @shutdown = true log!('Shutting down') @th.raise Interrupt if @sleeping end |
.unschedule_job(name) ⇒ Object
322 323 324 325 326 327 328 |
# File 'lib/resque/scheduler.rb', line 322 def unschedule_job(name) if scheduled_jobs[name] log "Removing schedule #{name}" scheduled_jobs[name].unschedule @scheduled_jobs.delete(name) end end |
.update_schedule ⇒ Object
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/resque/scheduler.rb', line 304 def update_schedule if Resque.redis.scard(:schedules_changed) > 0 procline 'Updating schedule' loop do schedule_name = Resque.redis.spop(:schedules_changed) break unless schedule_name Resque.reload_schedule! if Resque.schedule.keys.include?(schedule_name) unschedule_job(schedule_name) load_schedule_job(schedule_name, Resque.schedule[schedule_name]) else unschedule_job(schedule_name) end end procline 'Schedules Loaded' end end |