Class: MiniScheduler::Manager
- Inherits:
-
Object
- Object
- MiniScheduler::Manager
- Defined in:
- lib/mini_scheduler/manager.rb
Defined Under Namespace
Classes: Runner
Constant Summary collapse
- @@identity_key_mutex =
Mutex.new
Instance Attribute Summary collapse
-
#enable_stats ⇒ Object
Returns the value of attribute enable_stats.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#random_ratio ⇒ Object
Returns the value of attribute random_ratio.
-
#redis ⇒ Object
Returns the value of attribute redis.
-
#workers ⇒ Object
Returns the value of attribute workers.
Class Method Summary collapse
- .current ⇒ Object
- .discover_queues ⇒ Object
- .discover_schedules ⇒ Object
- .lock_key(queue) ⇒ Object
- .queue_key(queue, hostname = nil) ⇒ Object
- .schedule_key(klass, hostname = nil) ⇒ Object
- .seq ⇒ Object
- .without_runner ⇒ Object
Instance Method Summary collapse
- #blocking_tick ⇒ Object
- #ensure_schedule!(klass) ⇒ Object
- #get_klass(name) ⇒ Object
- #hostname ⇒ Object
- #identity_key ⇒ Object
-
#initialize(options = nil) ⇒ Manager
constructor
A new instance of Manager.
- #keep_alive(*ids) ⇒ Object
- #keep_alive_duration ⇒ Object
- #lock ⇒ Object
- #next_run(klass) ⇒ Object
- #remove(klass) ⇒ Object
- #repair_queue ⇒ Object
- #reschedule_orphans! ⇒ Object
- #reschedule_orphans_on!(hostname = nil) ⇒ Object
- #schedule_info(klass) ⇒ Object
- #schedule_next_job(hostname = nil) ⇒ Object
- #stop! ⇒ Object
- #tick ⇒ Object
Constructor Details
#initialize(options = nil) ⇒ Manager
Returns a new instance of Manager.
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/mini_scheduler/manager.rb', line 211 def initialize( = nil) @queue = && [:queue] || "default" @workers = && [:workers] || 1 @redis = MiniScheduler.redis @random_ratio = 0.1 unless && [:skip_runner] @runner = Runner.new(self) self.class.current[@queue] = self end @hostname = && [:hostname] @manager_id = SecureRandom.hex if && .key?(:enable_stats) @enable_stats = [:enable_stats] else @enable_stats = !!defined?(MiniScheduler::Stat) end end |
Instance Attribute Details
#enable_stats ⇒ Object
Returns the value of attribute enable_stats.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def enable_stats @enable_stats end |
#queue ⇒ Object
Returns the value of attribute queue.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def queue @queue end |
#random_ratio ⇒ Object
Returns the value of attribute random_ratio.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def random_ratio @random_ratio end |
#redis ⇒ Object
Returns the value of attribute redis.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def redis @redis end |
#workers ⇒ Object
Returns the value of attribute workers.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def workers @workers end |
Class Method Details
.current ⇒ Object
231 232 233 |
# File 'lib/mini_scheduler/manager.rb', line 231 def self.current @current ||= {} end |
.discover_queues ⇒ Object
359 360 361 |
# File 'lib/mini_scheduler/manager.rb', line 359 def self.discover_queues ObjectSpace.each_object(MiniScheduler::Schedule).map(&:queue).to_set end |
.discover_schedules ⇒ Object
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
# File 'lib/mini_scheduler/manager.rb', line 363 def self.discover_schedules # hack for developemnt reloader is crazytown # multiple classes with same name can be in # object space unique = Set.new schedules = [] ObjectSpace.each_object(MiniScheduler::Schedule) do |schedule| if schedule.scheduled? next if unique.include?(schedule.to_s) schedules << schedule unique << schedule.to_s end end schedules end |
.lock_key(queue) ⇒ Object
395 396 397 |
# File 'lib/mini_scheduler/manager.rb', line 395 def self.lock_key(queue) "_scheduler_lock_#{queue}_" end |
.queue_key(queue, hostname = nil) ⇒ Object
399 400 401 402 403 404 405 |
# File 'lib/mini_scheduler/manager.rb', line 399 def self.queue_key(queue, hostname = nil) if hostname "_scheduler_queue_#{queue}_#{hostname}_" else "_scheduler_queue_#{queue}_" end end |
.schedule_key(klass, hostname = nil) ⇒ Object
407 408 409 410 411 412 413 |
# File 'lib/mini_scheduler/manager.rb', line 407 def self.schedule_key(klass, hostname = nil) if hostname "_scheduler_#{klass}_#{hostname}" else "_scheduler_#{klass}" end end |
.seq ⇒ Object
380 381 382 383 384 385 |
# File 'lib/mini_scheduler/manager.rb', line 380 def self.seq @class_mutex.synchronize do @i ||= 0 @i += 1 end end |
.without_runner ⇒ Object
207 208 209 |
# File 'lib/mini_scheduler/manager.rb', line 207 def self.without_runner self.new(skip_runner: true) end |
Instance Method Details
#blocking_tick ⇒ Object
332 333 334 335 |
# File 'lib/mini_scheduler/manager.rb', line 332 def blocking_tick tick @runner.wait_till_done end |
#ensure_schedule!(klass) ⇒ Object
251 252 253 254 255 |
# File 'lib/mini_scheduler/manager.rb', line 251 def ensure_schedule!(klass) lock do schedule_info(klass).schedule! end end |
#get_klass(name) ⇒ Object
285 286 287 288 289 |
# File 'lib/mini_scheduler/manager.rb', line 285 def get_klass(name) name.constantize rescue NameError nil end |
#hostname ⇒ Object
235 236 237 238 239 240 241 |
# File 'lib/mini_scheduler/manager.rb', line 235 def hostname @hostname ||= begin `hostname`.strip rescue "unknown" end end |
#identity_key ⇒ Object
388 389 390 391 392 393 |
# File 'lib/mini_scheduler/manager.rb', line 388 def identity_key return @identity_key if @identity_key @@identity_key_mutex.synchronize do @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" end end |
#keep_alive(*ids) ⇒ Object
346 347 348 349 350 351 |
# File 'lib/mini_scheduler/manager.rb', line 346 def keep_alive(*ids) ids = [identity_key, *@runner.worker_thread_ids] if ids.size == 0 ids.each do |identity_key| redis.setex identity_key, keep_alive_duration, "" end end |
#keep_alive_duration ⇒ Object
342 343 344 |
# File 'lib/mini_scheduler/manager.rb', line 342 def keep_alive_duration 60 end |
#lock ⇒ Object
353 354 355 356 357 |
# File 'lib/mini_scheduler/manager.rb', line 353 def lock MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do yield end end |
#next_run(klass) ⇒ Object
247 248 249 |
# File 'lib/mini_scheduler/manager.rb', line 247 def next_run(klass) schedule_info(klass).next_run end |
#remove(klass) ⇒ Object
257 258 259 260 261 |
# File 'lib/mini_scheduler/manager.rb', line 257 def remove(klass) lock do schedule_info(klass).del! end end |
#repair_queue ⇒ Object
291 292 293 294 295 296 297 298 |
# File 'lib/mini_scheduler/manager.rb', line 291 def repair_queue return if redis.exists?(self.class.queue_key(queue)) || redis.exists?(self.class.queue_key(queue, hostname)) self.class.discover_schedules .select { |schedule| schedule.queue == queue } .each { |schedule| ensure_schedule!(schedule) } end |
#reschedule_orphans! ⇒ Object
263 264 265 266 267 268 |
# File 'lib/mini_scheduler/manager.rb', line 263 def reschedule_orphans! lock do reschedule_orphans_on! reschedule_orphans_on!(hostname) end end |
#reschedule_orphans_on!(hostname = nil) ⇒ Object
270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/mini_scheduler/manager.rb', line 270 def reschedule_orphans_on!(hostname = nil) redis.zrange(Manager.queue_key(queue, hostname), 0, -1).each do |key| klass = get_klass(key) next unless klass info = schedule_info(klass) if ['QUEUED', 'RUNNING'].include?(info.prev_result) && (info.current_owner.blank? || !redis.get(info.current_owner)) info.prev_result = 'ORPHAN' info.next_run = Time.now.to_i info.write! end end end |
#schedule_info(klass) ⇒ Object
243 244 245 |
# File 'lib/mini_scheduler/manager.rb', line 243 def schedule_info(klass) MiniScheduler::ScheduleInfo.new(klass, self) end |
#schedule_next_job(hostname = nil) ⇒ Object
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/mini_scheduler/manager.rb', line 307 def schedule_next_job(hostname = nil) (key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true return unless key if due.to_i <= Time.now.to_i klass = get_klass(key) if !klass || ( (klass.is_per_host && !hostname) || (hostname && !klass.is_per_host) ) # corrupt key, nuke it (renamed job or something) redis.zrem Manager.queue_key(queue, hostname), key return end info = schedule_info(klass) info.prev_run = Time.now.to_i info.prev_result = "QUEUED" info.prev_duration = -1 info.next_run = nil info.current_owner = identity_key info.schedule! @runner.enq(klass) end end |
#stop! ⇒ Object
337 338 339 340 |
# File 'lib/mini_scheduler/manager.rb', line 337 def stop! @runner.stop! self.class.current.delete(@queue) end |
#tick ⇒ Object
300 301 302 303 304 305 |
# File 'lib/mini_scheduler/manager.rb', line 300 def tick lock do schedule_next_job schedule_next_job(hostname) end end |