Class: MiniScheduler::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/mini_scheduler/manager.rb

Defined Under Namespace

Classes: Runner

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ Manager

Returns a new instance of Manager.



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/mini_scheduler/manager.rb', line 169

def initialize(options = nil)
  @redis = MiniScheduler.redis
  @random_ratio = 0.1
  unless options && options[:skip_runner]
    @runner = Runner.new(self)
    self.class.current = self
  end

  @hostname = options && options[:hostname]
  @manager_id = SecureRandom.hex

  if options && options.key?(:enable_stats)
    @enable_stats = options[:enable_stats]
  else
    @enable_stats = true # doesn't work !!defined?(MiniScheduler::Stat)
  end
end

Instance Attribute Details

#enable_statsObject

Returns the value of attribute enable_stats.



3
4
5
# File 'lib/mini_scheduler/manager.rb', line 3

def enable_stats
  @enable_stats
end

#random_ratioObject

Returns the value of attribute random_ratio.



3
4
5
# File 'lib/mini_scheduler/manager.rb', line 3

def random_ratio
  @random_ratio
end

#redisObject

Returns the value of attribute redis.



3
4
5
# File 'lib/mini_scheduler/manager.rb', line 3

def redis
  @redis
end

Class Method Details

.currentObject



187
188
189
# File 'lib/mini_scheduler/manager.rb', line 187

def self.current
  @current
end

.current=(manager) ⇒ Object



191
192
193
# File 'lib/mini_scheduler/manager.rb', line 191

def self.current=(manager)
  @current = manager
end

.discover_schedulesObject



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/mini_scheduler/manager.rb', line 300

def self.discover_schedules
  # hack for developemnt reloader is crazytown
  # multiple classes with same name can be in
  # object space
  unique = Set.new
  schedules = []
  ObjectSpace.each_object(MiniScheduler::Schedule) do |schedule|
    if schedule.scheduled?
      next if unique.include?(schedule.to_s)
      schedules << schedule
      unique << schedule.to_s
    end
  end
  schedules
end

.lock_keyObject



328
329
330
# File 'lib/mini_scheduler/manager.rb', line 328

def self.lock_key
  "_scheduler_lock_"
end

.queue_key(hostname = nil) ⇒ Object



332
333
334
335
336
337
338
# File 'lib/mini_scheduler/manager.rb', line 332

def self.queue_key(hostname = nil)
  if hostname
    "_scheduler_queue_#{hostname}_"
  else
    "_scheduler_queue_"
  end
end

.schedule_key(klass, hostname = nil) ⇒ Object



340
341
342
343
344
345
346
# File 'lib/mini_scheduler/manager.rb', line 340

def self.schedule_key(klass, hostname = nil)
  if hostname
    "_scheduler_#{klass}_#{hostname}"
  else
    "_scheduler_#{klass}"
  end
end

.seqObject



317
318
319
320
321
322
# File 'lib/mini_scheduler/manager.rb', line 317

def self.seq
  @mutex.synchronize do
    @i ||= 0
    @i += 1
  end
end

.without_runnerObject



165
166
167
# File 'lib/mini_scheduler/manager.rb', line 165

def self.without_runner
  self.new(skip_runner: true)
end

Instance Method Details

#blocking_tickObject



276
277
278
279
# File 'lib/mini_scheduler/manager.rb', line 276

def blocking_tick
  tick
  @runner.wait_till_done
end

#ensure_schedule!(klass) ⇒ Object



207
208
209
210
211
# File 'lib/mini_scheduler/manager.rb', line 207

def ensure_schedule!(klass)
  lock do
    schedule_info(klass).schedule!
  end
end

#get_klass(name) ⇒ Object



241
242
243
244
245
# File 'lib/mini_scheduler/manager.rb', line 241

def get_klass(name)
  name.constantize
rescue NameError
  nil
end

#hostnameObject



195
196
197
# File 'lib/mini_scheduler/manager.rb', line 195

def hostname
  @hostname ||= `hostname`.strip
end

#identity_keyObject



324
325
326
# File 'lib/mini_scheduler/manager.rb', line 324

def identity_key
  @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
end

#keep_aliveObject



290
291
292
# File 'lib/mini_scheduler/manager.rb', line 290

def keep_alive
  redis.setex identity_key, keep_alive_duration, ""
end

#keep_alive_durationObject



286
287
288
# File 'lib/mini_scheduler/manager.rb', line 286

def keep_alive_duration
  60
end

#lockObject



294
295
296
297
298
# File 'lib/mini_scheduler/manager.rb', line 294

def lock
  MiniScheduler::DistributedMutex.synchronize(Manager.lock_key, MiniScheduler.redis) do
    yield
  end
end

#next_run(klass) ⇒ Object



203
204
205
# File 'lib/mini_scheduler/manager.rb', line 203

def next_run(klass)
  schedule_info(klass).next_run
end

#remove(klass) ⇒ Object



213
214
215
216
217
# File 'lib/mini_scheduler/manager.rb', line 213

def remove(klass)
  lock do
    schedule_info(klass).del!
  end
end

#reschedule_orphans!Object



219
220
221
222
223
224
# File 'lib/mini_scheduler/manager.rb', line 219

def reschedule_orphans!
  lock do
    reschedule_orphans_on!
    reschedule_orphans_on!(hostname)
  end
end

#reschedule_orphans_on!(hostname = nil) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/mini_scheduler/manager.rb', line 226

def reschedule_orphans_on!(hostname = nil)
  redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key|
    klass = get_klass(key)
    next unless klass
    info = schedule_info(klass)

    if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
      (info.current_owner.blank? || !redis.get(info.current_owner))
      info.prev_result = 'ORPHAN'
      info.next_run = Time.now.to_i
      info.write!
    end
  end
end

#schedule_info(klass) ⇒ Object



199
200
201
# File 'lib/mini_scheduler/manager.rb', line 199

def schedule_info(klass)
  MiniScheduler::ScheduleInfo.new(klass, self)
end

#schedule_next_job(hostname = nil) ⇒ Object



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/mini_scheduler/manager.rb', line 254

def schedule_next_job(hostname = nil)
  (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true
  return unless key

  if due.to_i <= Time.now.to_i
    klass = get_klass(key)
    unless klass
      # corrupt key, nuke it (renamed job or something)
      redis.zrem Manager.queue_key(hostname), key
      return
    end
    info = schedule_info(klass)
    info.prev_run = Time.now.to_i
    info.prev_result = "QUEUED"
    info.prev_duration = -1
    info.next_run = nil
    info.current_owner = identity_key
    info.schedule!
    @runner.enq(klass)
  end
end

#stop!Object



281
282
283
284
# File 'lib/mini_scheduler/manager.rb', line 281

def stop!
  @runner.stop!
  self.class.current = nil
end

#tickObject



247
248
249
250
251
252
# File 'lib/mini_scheduler/manager.rb', line 247

def tick
  lock do
    schedule_next_job
    schedule_next_job(hostname)
  end
end