Module: Resque::Scheduler
- Extended by:
- Configuration, Locking, SignalHandling
- Defined in:
- lib/resque/scheduler.rb,
lib/resque/scheduler/cli.rb,
lib/resque/scheduler/env.rb,
lib/resque/scheduler/util.rb,
lib/resque/scheduler/plugin.rb,
lib/resque/scheduler/server.rb,
lib/resque/scheduler/locking.rb,
lib/resque/scheduler/version.rb,
lib/resque/scheduler/extension.rb,
lib/resque/scheduler/lock/base.rb,
lib/resque/scheduler/lock/basic.rb,
lib/resque/scheduler/configuration.rb,
lib/resque/scheduler/lock/resilient.rb,
lib/resque/scheduler/logger_builder.rb,
lib/resque/scheduler/signal_handling.rb,
lib/resque/scheduler/delaying_extensions.rb,
lib/resque/scheduler/scheduling_extensions.rb
Defined Under Namespace
Modules: Configuration, DelayingExtensions, Extension, Lock, Locking, Plugin, SchedulingExtensions, Server, SignalHandling Classes: Cli, Env, LoggerBuilder, Util
Constant Summary collapse
- CLI_OPTIONS_ENV_MAPPING =
{ app_name: 'APP_NAME', background: 'BACKGROUND', dynamic: 'DYNAMIC_SCHEDULE', env: 'RAILS_ENV', initializer_path: 'INITIALIZER_PATH', logfile: 'LOGFILE', logformat: 'LOGFORMAT', quiet: 'QUIET', pidfile: 'PIDFILE', poll_sleep_amount: 'RESQUE_SCHEDULER_INTERVAL', verbose: 'VERBOSE' }
- VERSION =
'4.0.0'
Class Attribute Summary collapse
-
.scheduled_jobs ⇒ Object
readonly
the Rufus::Scheduler jobs that are scheduled.
Attributes included from Configuration
#app_name, #dynamic, #env, #logfile, #logformat, #poll_sleep_amount, #quiet, #verbose
Attributes included from SignalHandling
Class Method Summary collapse
-
.clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one.
-
.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object
Enqueues all delayed jobs for a timestamp.
-
.enqueue_from_config(job_config) ⇒ Object
Enqueues a job based on a config hash.
-
.env_matches?(configured_env) ⇒ Boolean
Returns true if the current env is non-nil and the configured env (which is a comma-split string) includes the current env.
-
.handle_delayed_items(at_time = nil) ⇒ Object
Handles queueing delayed items at_time - Time to start scheduling items (default: now).
- .handle_errors ⇒ Object
- .handle_shutdown ⇒ Object
-
.load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance.
-
.load_schedule_job(name, config) ⇒ Object
Loads a job schedule into the Rufus::Scheduler and stores it in @scheduled_jobs.
- .log(msg) ⇒ Object
- .log!(msg) ⇒ Object
- .log_error(msg) ⇒ Object
-
.optionizate_interval_value(value) ⇒ Object
modify interval type value to value with options if options available.
-
.poll_sleep ⇒ Object
Sleeps and returns true.
- .poll_sleep_loop ⇒ Object
- .print_schedule ⇒ Object
- .procline(string) ⇒ Object
-
.rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current env.
- .reload_schedule! ⇒ Object
- .rufus_scheduler ⇒ Object
-
.run ⇒ Object
Schedule all jobs and continually look for delayed jobs (never returns).
-
.shutdown ⇒ Object
Sets the shutdown flag, clean schedules and exits if sleeping.
- .unschedule_job(name) ⇒ Object
- .update_schedule ⇒ Object
Methods included from Locking
master?, master_lock, release_master_lock, release_master_lock!, supports_lua?
Methods included from Configuration
Methods included from SignalHandling
handle_signals, register_signal_handlers
Class Attribute Details
.scheduled_jobs ⇒ Object (readonly)
the Rufus::Scheduler jobs that are scheduled
26 27 28 |
# File 'lib/resque/scheduler.rb', line 26 def scheduled_jobs @scheduled_jobs end |
Class Method Details
.clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler
283 284 285 286 287 288 |
# File 'lib/resque/scheduler.rb', line 283 def clear_schedule! rufus_scheduler.stop @rufus_scheduler = nil @scheduled_jobs = {} rufus_scheduler end |
.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object
Enqueues all delayed jobs for a timestamp
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/resque/scheduler.rb', line 189 def () item = nil loop do handle_shutdown do # Continually check that it is still the master if master? item = Resque.() if item log "queuing #{item['class']} [delayed]" handle_errors { enqueue_from_config(item) } end end end # continue processing until there are no more ready items in this # timestamp break if item.nil? end end |
.enqueue_from_config(job_config) ⇒ Object
Enqueues a job based on a config hash
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/resque/scheduler.rb', line 221 def enqueue_from_config(job_config) args = job_config['args'] || job_config[:args] klass_name = job_config['class'] || job_config[:class] begin klass = Resque::Scheduler::Util.constantize(klass_name) rescue NameError klass = klass_name end params = args.is_a?(Hash) ? [args] : Array(args) queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(klass) # Support custom job classes like those that inherit from # Resque::JobWithStatus (resque-status) job_klass = job_config['custom_job_class'] if job_klass && job_klass != 'Resque::Job' # The custom job class API must offer a static "scheduled" method. If # the custom job class can not be constantized (via a requeue call # from the web perhaps), fall back to enqueing normally via # Resque::Job.create. begin Resque::Scheduler::Util.constantize(job_klass).scheduled( queue, klass_name, *params ) rescue NameError # Note that the custom job class (job_config['custom_job_class']) # is the one enqueued Resque::Job.create(queue, job_klass, *params) end else # Hack to avoid havoc for people shoving stuff into queues # for non-existent classes (for example: running scheduler in # one app that schedules for another. if Class === klass Resque::Scheduler::Plugin.run_before_delayed_enqueue_hooks( klass, *params ) # If the class is a custom job class, call self#scheduled on it. # This allows you to do things like Resque.enqueue_at(timestamp, # CustomJobClass). Otherwise, pass off to Resque. if klass.respond_to?(:scheduled) klass.scheduled(queue, klass_name, *params) else Resque.enqueue_to(queue, klass, *params) end else # This will not run the before_hooks in rescue, but will at least # queue the job. Resque::Job.create(queue, klass, *params) end end end |
.env_matches?(configured_env) ⇒ Boolean
Returns true if the current env is non-nil and the configured env (which is a comma-split string) includes the current env.
171 172 173 |
# File 'lib/resque/scheduler.rb', line 171 def env_matches?(configured_env) env && configured_env.split(/[\s,]+/).include?(env) end |
.handle_delayed_items(at_time = nil) ⇒ Object
Handles queueing delayed items at_time - Time to start scheduling items (default: now).
177 178 179 180 181 182 183 184 185 186 |
# File 'lib/resque/scheduler.rb', line 177 def handle_delayed_items(at_time = nil) = Resque.(at_time) if procline 'Processing Delayed Items' until .nil? () = Resque.(at_time) end end end |
.handle_errors ⇒ Object
214 215 216 217 218 |
# File 'lib/resque/scheduler.rb', line 214 def handle_errors yield rescue => e log_error "#{e.class.name}: #{e.}" end |
.handle_shutdown ⇒ Object
208 209 210 211 212 |
# File 'lib/resque/scheduler.rb', line 208 def handle_shutdown exit if @shutdown yield exit if @shutdown end |
.load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/resque/scheduler.rb', line 82 def load_schedule! procline 'Loading Schedule' # Need to load the schedule from redis for the first time if dynamic Resque.reload_schedule! if dynamic log! 'Schedule empty! Set Resque.schedule' if Resque.schedule.empty? @scheduled_jobs = {} Resque.schedule.each do |name, config| load_schedule_job(name, config) end Resque.redis.del(:schedules_changed) procline 'Schedules Loaded' end |
.load_schedule_job(name, config) ⇒ Object
Loads a job schedule into the Rufus::Scheduler and stores it in @scheduled_jobs
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/resque/scheduler.rb', line 119 def load_schedule_job(name, config) # If `rails_env` or `env` is set in the config, load jobs only if they # are meant to be loaded in `Resque::Scheduler.env`. If `rails_env` or # `env` is missing, the job should be scheduled regardless of the value # of `Resque::Scheduler.env`. configured_env = config['rails_env'] || config['env'] if configured_env.nil? || env_matches?(configured_env) log! "Scheduling #{name} " interval_defined = false interval_types = %w(cron every) interval_types.each do |interval_type| if !config[interval_type].nil? && config[interval_type].length > 0 args = optionizate_interval_value(config[interval_type]) if args.is_a?(::String) args = [args, nil, job: true] end job = rufus_scheduler.send(interval_type, *args) do if master? log! "queueing #{config['class']} (#{name})" Resque.last_enqueued_at(name, Time.now.to_s) handle_errors { enqueue_from_config(config) } end end @scheduled_jobs[name] = job interval_defined = true break end end unless interval_defined log! "no #{interval_types.join(' / ')} found for " \ "#{config['class']} (#{name}) - skipping" end else log "Skipping schedule of #{name} because configured " \ "env #{configured_env.inspect} does not match current " \ "env #{env.inspect}" end end |
.log(msg) ⇒ Object
370 371 372 |
# File 'lib/resque/scheduler.rb', line 370 def log(msg) logger.debug { msg } end |
.log!(msg) ⇒ Object
362 363 364 |
# File 'lib/resque/scheduler.rb', line 362 def log!(msg) logger.info { msg } end |
.log_error(msg) ⇒ Object
366 367 368 |
# File 'lib/resque/scheduler.rb', line 366 def log_error(msg) logger.error { msg } end |
.optionizate_interval_value(value) ⇒ Object
modify interval type value to value with options if options available
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/resque/scheduler.rb', line 100 def optionizate_interval_value(value) args = value if args.is_a?(::Array) return args.first if args.size > 2 || !args.last.is_a?(::Hash) # symbolize keys of hash for options args[2] = args[1].reduce({}) do |m, i| key, value = i m[(key.respond_to?(:to_sym) ? key.to_sym : key) || key] = value m end args[2][:job] = true args[1] = nil end args end |
.poll_sleep ⇒ Object
Sleeps and returns true
323 324 325 326 327 328 329 330 331 332 |
# File 'lib/resque/scheduler.rb', line 323 def poll_sleep handle_shutdown do begin poll_sleep_loop ensure @sleeping = false end end true end |
.poll_sleep_loop ⇒ Object
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/resque/scheduler.rb', line 334 def poll_sleep_loop @sleeping = true start = Time.now loop do elapsed_sleep = (Time.now - start) remaining_sleep = poll_sleep_amount - elapsed_sleep break if remaining_sleep <= 0 begin sleep(remaining_sleep) handle_signals rescue Interrupt if @shutdown Resque.clean_schedules release_master_lock end break end end end |
.print_schedule ⇒ Object
70 71 72 73 74 75 76 77 78 |
# File 'lib/resque/scheduler.rb', line 70 def print_schedule if rufus_scheduler log! "Scheduling Info\tLast Run" scheduler_jobs = rufus_scheduler.jobs scheduler_jobs.each do |_k, v| log! "#{v.t}\t#{v.last}\t" end end end |
.procline(string) ⇒ Object
374 375 376 377 378 379 |
# File 'lib/resque/scheduler.rb', line 374 def procline(string) log! string argv0 = build_procline(string) log "Setting procline #{argv0.inspect}" $0 = argv0 end |
.rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current env
162 163 164 165 166 167 |
# File 'lib/resque/scheduler.rb', line 162 def rails_env_matches?(config) warn '`Resque::Scheduler.rails_env_matches?` is deprecated. ' \ 'Please use `Resque::Scheduler.env_matches?` instead.' config['rails_env'] && env && config['rails_env'].split(/[\s,]+/).include?(env) end |
.reload_schedule! ⇒ Object
290 291 292 293 294 |
# File 'lib/resque/scheduler.rb', line 290 def reload_schedule! procline 'Reloading Schedule' clear_schedule! load_schedule! end |
.rufus_scheduler ⇒ Object
277 278 279 |
# File 'lib/resque/scheduler.rb', line 277 def rufus_scheduler @rufus_scheduler ||= Rufus::Scheduler.new end |
.run ⇒ Object
Schedule all jobs and continually look for delayed jobs (never returns)
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/resque/scheduler.rb', line 29 def run procline 'Starting' # trap signals register_signal_handlers # Quote from the resque/worker. # Fix buffering so we can `rake resque:scheduler > scheduler.log` and # get output from the child in there. $stdout.sync = true $stderr.sync = true # Load the schedule into rufus # If dynamic is set, load that schedule otherwise use normal load if dynamic reload_schedule! else load_schedule! end begin @th = Thread.current # Now start the scheduling part of the loop. loop do if master? begin handle_delayed_items update_schedule if dynamic rescue Errno::EAGAIN, Errno::ECONNRESET => e log! e. end end poll_sleep end rescue Interrupt log 'Exiting' end end |
.shutdown ⇒ Object
Sets the shutdown flag, clean schedules and exits if sleeping
355 356 357 358 359 360 |
# File 'lib/resque/scheduler.rb', line 355 def shutdown return if @shutdown @shutdown = true log!('Shutting down') @th.raise Interrupt if @sleeping end |
.unschedule_job(name) ⇒ Object
314 315 316 317 318 319 320 |
# File 'lib/resque/scheduler.rb', line 314 def unschedule_job(name) if scheduled_jobs[name] log "Removing schedule #{name}" scheduled_jobs[name].unschedule @scheduled_jobs.delete(name) end end |
.update_schedule ⇒ Object
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/resque/scheduler.rb', line 296 def update_schedule if Resque.redis.scard(:schedules_changed) > 0 procline 'Updating schedule' loop do schedule_name = Resque.redis.spop(:schedules_changed) break unless schedule_name Resque.reload_schedule! if Resque.schedule.keys.include?(schedule_name) unschedule_job(schedule_name) load_schedule_job(schedule_name, Resque.schedule[schedule_name]) else unschedule_job(schedule_name) end end procline 'Schedules Loaded' end end |