Class: MiniScheduler::Manager
- Inherits:
-
Object
- Object
- MiniScheduler::Manager
- Defined in:
- lib/mini_scheduler/manager.rb
Defined Under Namespace
Classes: Runner
Instance Attribute Summary collapse
-
#enable_stats ⇒ Object
Returns the value of attribute enable_stats.
-
#random_ratio ⇒ Object
Returns the value of attribute random_ratio.
-
#redis ⇒ Object
Returns the value of attribute redis.
Class Method Summary collapse
- .current ⇒ Object
- .current=(manager) ⇒ Object
- .discover_schedules ⇒ Object
- .lock_key ⇒ Object
- .queue_key(hostname = nil) ⇒ Object
- .schedule_key(klass, hostname = nil) ⇒ Object
- .seq ⇒ Object
- .without_runner ⇒ Object
Instance Method Summary collapse
- #blocking_tick ⇒ Object
- #ensure_schedule!(klass) ⇒ Object
- #get_klass(name) ⇒ Object
- #hostname ⇒ Object
- #identity_key ⇒ Object
-
#initialize(options = nil) ⇒ Manager
constructor
A new instance of Manager.
- #keep_alive ⇒ Object
- #keep_alive_duration ⇒ Object
- #lock ⇒ Object
- #next_run(klass) ⇒ Object
- #remove(klass) ⇒ Object
- #reschedule_orphans! ⇒ Object
- #reschedule_orphans_on!(hostname = nil) ⇒ Object
- #schedule_info(klass) ⇒ Object
- #schedule_next_job(hostname = nil) ⇒ Object
- #stop! ⇒ Object
- #tick ⇒ Object
Constructor Details
#initialize(options = nil) ⇒ Manager
Returns a new instance of Manager.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/mini_scheduler/manager.rb', line 169 def initialize( = nil) @redis = MiniScheduler.redis @random_ratio = 0.1 unless && [:skip_runner] @runner = Runner.new(self) self.class.current = self end @hostname = && [:hostname] @manager_id = SecureRandom.hex if && .key?(:enable_stats) @enable_stats = [:enable_stats] else @enable_stats = true # doesn't work !!defined?(MiniScheduler::Stat) end end |
Instance Attribute Details
#enable_stats ⇒ Object
Returns the value of attribute enable_stats.
3 4 5 |
# File 'lib/mini_scheduler/manager.rb', line 3 def enable_stats @enable_stats end |
#random_ratio ⇒ Object
Returns the value of attribute random_ratio.
3 4 5 |
# File 'lib/mini_scheduler/manager.rb', line 3 def random_ratio @random_ratio end |
#redis ⇒ Object
Returns the value of attribute redis.
3 4 5 |
# File 'lib/mini_scheduler/manager.rb', line 3 def redis @redis end |
Class Method Details
.current ⇒ Object
187 188 189 |
# File 'lib/mini_scheduler/manager.rb', line 187 def self.current @current end |
.current=(manager) ⇒ Object
191 192 193 |
# File 'lib/mini_scheduler/manager.rb', line 191 def self.current=(manager) @current = manager end |
.discover_schedules ⇒ Object
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/mini_scheduler/manager.rb', line 300 def self.discover_schedules # hack for developemnt reloader is crazytown # multiple classes with same name can be in # object space unique = Set.new schedules = [] ObjectSpace.each_object(MiniScheduler::Schedule) do |schedule| if schedule.scheduled? next if unique.include?(schedule.to_s) schedules << schedule unique << schedule.to_s end end schedules end |
.lock_key ⇒ Object
328 329 330 |
# File 'lib/mini_scheduler/manager.rb', line 328 def self.lock_key "_scheduler_lock_" end |
.queue_key(hostname = nil) ⇒ Object
332 333 334 335 336 337 338 |
# File 'lib/mini_scheduler/manager.rb', line 332 def self.queue_key(hostname = nil) if hostname "_scheduler_queue_#{hostname}_" else "_scheduler_queue_" end end |
.schedule_key(klass, hostname = nil) ⇒ Object
340 341 342 343 344 345 346 |
# File 'lib/mini_scheduler/manager.rb', line 340 def self.schedule_key(klass, hostname = nil) if hostname "_scheduler_#{klass}_#{hostname}" else "_scheduler_#{klass}" end end |
.seq ⇒ Object
317 318 319 320 321 322 |
# File 'lib/mini_scheduler/manager.rb', line 317 def self.seq @mutex.synchronize do @i ||= 0 @i += 1 end end |
.without_runner ⇒ Object
165 166 167 |
# File 'lib/mini_scheduler/manager.rb', line 165 def self.without_runner self.new(skip_runner: true) end |
Instance Method Details
#blocking_tick ⇒ Object
276 277 278 279 |
# File 'lib/mini_scheduler/manager.rb', line 276 def blocking_tick tick @runner.wait_till_done end |
#ensure_schedule!(klass) ⇒ Object
207 208 209 210 211 |
# File 'lib/mini_scheduler/manager.rb', line 207 def ensure_schedule!(klass) lock do schedule_info(klass).schedule! end end |
#get_klass(name) ⇒ Object
241 242 243 244 245 |
# File 'lib/mini_scheduler/manager.rb', line 241 def get_klass(name) name.constantize rescue NameError nil end |
#hostname ⇒ Object
195 196 197 |
# File 'lib/mini_scheduler/manager.rb', line 195 def hostname @hostname ||= `hostname`.strip end |
#identity_key ⇒ Object
324 325 326 |
# File 'lib/mini_scheduler/manager.rb', line 324 def identity_key @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" end |
#keep_alive ⇒ Object
290 291 292 |
# File 'lib/mini_scheduler/manager.rb', line 290 def keep_alive redis.setex identity_key, keep_alive_duration, "" end |
#keep_alive_duration ⇒ Object
286 287 288 |
# File 'lib/mini_scheduler/manager.rb', line 286 def keep_alive_duration 60 end |
#lock ⇒ Object
294 295 296 297 298 |
# File 'lib/mini_scheduler/manager.rb', line 294 def lock MiniScheduler::DistributedMutex.synchronize(Manager.lock_key, MiniScheduler.redis) do yield end end |
#next_run(klass) ⇒ Object
203 204 205 |
# File 'lib/mini_scheduler/manager.rb', line 203 def next_run(klass) schedule_info(klass).next_run end |
#remove(klass) ⇒ Object
213 214 215 216 217 |
# File 'lib/mini_scheduler/manager.rb', line 213 def remove(klass) lock do schedule_info(klass).del! end end |
#reschedule_orphans! ⇒ Object
219 220 221 222 223 224 |
# File 'lib/mini_scheduler/manager.rb', line 219 def reschedule_orphans! lock do reschedule_orphans_on! reschedule_orphans_on!(hostname) end end |
#reschedule_orphans_on!(hostname = nil) ⇒ Object
226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/mini_scheduler/manager.rb', line 226 def reschedule_orphans_on!(hostname = nil) redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key| klass = get_klass(key) next unless klass info = schedule_info(klass) if ['QUEUED', 'RUNNING'].include?(info.prev_result) && (info.current_owner.blank? || !redis.get(info.current_owner)) info.prev_result = 'ORPHAN' info.next_run = Time.now.to_i info.write! end end end |
#schedule_info(klass) ⇒ Object
199 200 201 |
# File 'lib/mini_scheduler/manager.rb', line 199 def schedule_info(klass) MiniScheduler::ScheduleInfo.new(klass, self) end |
#schedule_next_job(hostname = nil) ⇒ Object
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/mini_scheduler/manager.rb', line 254 def schedule_next_job(hostname = nil) (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true return unless key if due.to_i <= Time.now.to_i klass = get_klass(key) unless klass # corrupt key, nuke it (renamed job or something) redis.zrem Manager.queue_key(hostname), key return end info = schedule_info(klass) info.prev_run = Time.now.to_i info.prev_result = "QUEUED" info.prev_duration = -1 info.next_run = nil info.current_owner = identity_key info.schedule! @runner.enq(klass) end end |
#stop! ⇒ Object
281 282 283 284 |
# File 'lib/mini_scheduler/manager.rb', line 281 def stop! @runner.stop! self.class.current = nil end |
#tick ⇒ Object
247 248 249 250 251 252 |
# File 'lib/mini_scheduler/manager.rb', line 247 def tick lock do schedule_next_job schedule_next_job(hostname) end end |