Class: MiniScheduler::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/mini_scheduler/manager.rb

Defined Under Namespace

Classes: Runner

Constant Summary collapse

@@identity_key_mutex =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ Manager

Returns a new instance of Manager.



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/mini_scheduler/manager.rb', line 211

def initialize(options = nil)
  @queue = options && options[:queue] || "default"
  @workers = options && options[:workers] || 1
  @redis = MiniScheduler.redis
  @random_ratio = 0.1
  unless options && options[:skip_runner]
    @runner = Runner.new(self)
    self.class.current[@queue] = self
  end

  @hostname = options && options[:hostname]
  @manager_id = SecureRandom.hex

  if options && options.key?(:enable_stats)
    @enable_stats = options[:enable_stats]
  else
    @enable_stats = !!defined?(MiniScheduler::Stat)
  end
end

Instance Attribute Details

#enable_statsObject

Returns the value of attribute enable_stats.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def enable_stats
  @enable_stats
end

#queueObject

Returns the value of attribute queue.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def queue
  @queue
end

#random_ratioObject

Returns the value of attribute random_ratio.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def random_ratio
  @random_ratio
end

#redisObject

Returns the value of attribute redis.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def redis
  @redis
end

#workersObject

Returns the value of attribute workers.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def workers
  @workers
end

Class Method Details

.currentObject



231
232
233
# File 'lib/mini_scheduler/manager.rb', line 231

def self.current
  @current ||= {}
end

.discover_queuesObject



359
360
361
# File 'lib/mini_scheduler/manager.rb', line 359

def self.discover_queues
  ObjectSpace.each_object(MiniScheduler::Schedule).map(&:queue).to_set
end

.discover_schedulesObject



363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/mini_scheduler/manager.rb', line 363

def self.discover_schedules
  # hack for developemnt reloader is crazytown
  # multiple classes with same name can be in
  # object space
  unique = Set.new
  schedules = []
  ObjectSpace.each_object(MiniScheduler::Schedule) do |schedule|
    if schedule.scheduled?
      next if unique.include?(schedule.to_s)
      schedules << schedule
      unique << schedule.to_s
    end
  end
  schedules
end

.lock_key(queue) ⇒ Object



395
396
397
# File 'lib/mini_scheduler/manager.rb', line 395

def self.lock_key(queue)
  "_scheduler_lock_#{queue}_"
end

.queue_key(queue, hostname = nil) ⇒ Object



399
400
401
402
403
404
405
# File 'lib/mini_scheduler/manager.rb', line 399

def self.queue_key(queue, hostname = nil)
  if hostname
    "_scheduler_queue_#{queue}_#{hostname}_"
  else
    "_scheduler_queue_#{queue}_"
  end
end

.schedule_key(klass, hostname = nil) ⇒ Object



407
408
409
410
411
412
413
# File 'lib/mini_scheduler/manager.rb', line 407

def self.schedule_key(klass, hostname = nil)
  if hostname
    "_scheduler_#{klass}_#{hostname}"
  else
    "_scheduler_#{klass}"
  end
end

.seqObject



380
381
382
383
384
385
# File 'lib/mini_scheduler/manager.rb', line 380

def self.seq
  @class_mutex.synchronize do
    @i ||= 0
    @i += 1
  end
end

.without_runnerObject



207
208
209
# File 'lib/mini_scheduler/manager.rb', line 207

def self.without_runner
  self.new(skip_runner: true)
end

Instance Method Details

#blocking_tickObject



332
333
334
335
# File 'lib/mini_scheduler/manager.rb', line 332

def blocking_tick
  tick
  @runner.wait_till_done
end

#ensure_schedule!(klass) ⇒ Object



251
252
253
254
255
# File 'lib/mini_scheduler/manager.rb', line 251

def ensure_schedule!(klass)
  lock do
    schedule_info(klass).schedule!
  end
end

#get_klass(name) ⇒ Object



285
286
287
288
289
# File 'lib/mini_scheduler/manager.rb', line 285

def get_klass(name)
  name.constantize
rescue NameError
  nil
end

#hostnameObject



235
236
237
238
239
240
241
# File 'lib/mini_scheduler/manager.rb', line 235

def hostname
  @hostname ||= begin
                  `hostname`.strip
                rescue
                  "unknown"
                end
end

#identity_keyObject



388
389
390
391
392
393
# File 'lib/mini_scheduler/manager.rb', line 388

def identity_key
  return @identity_key if @identity_key
  @@identity_key_mutex.synchronize do
    @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
  end
end

#keep_alive(*ids) ⇒ Object



346
347
348
349
350
351
# File 'lib/mini_scheduler/manager.rb', line 346

def keep_alive(*ids)
  ids = [identity_key, *@runner.worker_thread_ids] if ids.size == 0
  ids.each do |identity_key|
    redis.setex identity_key, keep_alive_duration, ""
  end
end

#keep_alive_durationObject



342
343
344
# File 'lib/mini_scheduler/manager.rb', line 342

def keep_alive_duration
  60
end

#lockObject



353
354
355
356
357
# File 'lib/mini_scheduler/manager.rb', line 353

def lock
  MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do
    yield
  end
end

#next_run(klass) ⇒ Object



247
248
249
# File 'lib/mini_scheduler/manager.rb', line 247

def next_run(klass)
  schedule_info(klass).next_run
end

#remove(klass) ⇒ Object



257
258
259
260
261
# File 'lib/mini_scheduler/manager.rb', line 257

def remove(klass)
  lock do
    schedule_info(klass).del!
  end
end

#repair_queueObject



291
292
293
294
295
296
297
298
# File 'lib/mini_scheduler/manager.rb', line 291

def repair_queue
  return if redis.exists?(self.class.queue_key(queue)) ||
    redis.exists?(self.class.queue_key(queue, hostname))

  self.class.discover_schedules
    .select { |schedule| schedule.queue == queue }
    .each { |schedule| ensure_schedule!(schedule) }
end

#reschedule_orphans!Object



263
264
265
266
267
268
# File 'lib/mini_scheduler/manager.rb', line 263

def reschedule_orphans!
  lock do
    reschedule_orphans_on!
    reschedule_orphans_on!(hostname)
  end
end

#reschedule_orphans_on!(hostname = nil) ⇒ Object



270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/mini_scheduler/manager.rb', line 270

def reschedule_orphans_on!(hostname = nil)
  redis.zrange(Manager.queue_key(queue, hostname), 0, -1).each do |key|
    klass = get_klass(key)
    next unless klass
    info = schedule_info(klass)

    if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
      (info.current_owner.blank? || !redis.get(info.current_owner))
      info.prev_result = 'ORPHAN'
      info.next_run = Time.now.to_i
      info.write!
    end
  end
end

#schedule_info(klass) ⇒ Object



243
244
245
# File 'lib/mini_scheduler/manager.rb', line 243

def schedule_info(klass)
  MiniScheduler::ScheduleInfo.new(klass, self)
end

#schedule_next_job(hostname = nil) ⇒ Object



307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/mini_scheduler/manager.rb', line 307

def schedule_next_job(hostname = nil)
  (key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true
  return unless key

  if due.to_i <= Time.now.to_i
    klass = get_klass(key)
    if !klass || (
      (klass.is_per_host && !hostname) || (hostname && !klass.is_per_host)
    )
      # corrupt key, nuke it (renamed job or something)
      redis.zrem Manager.queue_key(queue, hostname), key
      return
    end

    info = schedule_info(klass)
    info.prev_run = Time.now.to_i
    info.prev_result = "QUEUED"
    info.prev_duration = -1
    info.next_run = nil
    info.current_owner = identity_key
    info.schedule!
    @runner.enq(klass)
  end
end

#stop!Object



337
338
339
340
# File 'lib/mini_scheduler/manager.rb', line 337

def stop!
  @runner.stop!
  self.class.current.delete(@queue)
end

#tickObject



300
301
302
303
304
305
# File 'lib/mini_scheduler/manager.rb', line 300

def tick
  lock do
    schedule_next_job
    schedule_next_job(hostname)
  end
end