Class: Sidecloq::Scheduler

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/sidecloq/scheduler.rb

Overview

Scheduler enqeues jobs according to the given schedule

Instance Method Summary collapse

Methods included from Utils

included, #logger, #redis, #will_never_run

Constructor Details

#initialize(schedule, options = {}) ⇒ Scheduler

Returns a new instance of Scheduler.



6
7
8
9
# File 'lib/sidecloq/scheduler.rb', line 6

def initialize(schedule, options = {})
  @schedule = schedule
  @options = options
end

Instance Method Details

#enqueue_job!(name, spec) ⇒ Object

can raise exceptions, but shouldn’t



81
82
83
# File 'lib/sidecloq/scheduler.rb', line 81

def enqueue_job!(name, spec)
  Sidekiq::Client.push(spec)
end

#load_into_rufus(name, spec) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/sidecloq/scheduler.rb', line 56

def load_into_rufus(name, spec)
  # rufus will loop indefinitely trying to find the next event time if the
  # cronline is impossible, like '0 5 31 2 *'
  if will_never_run(spec['cron'])
    logger.info("Impossible cronline detected, not scheduling #{name}: #{spec}")
  else
    logger.info("Scheduling #{name}: #{spec}")
    rufus.cron(spec['cron']) do
      safe_enqueue_job(name, spec)
    end
  end
end

#load_schedule_into_rufusObject



49
50
51
52
53
54
# File 'lib/sidecloq/scheduler.rb', line 49

def load_schedule_into_rufus
  logger.debug("Scheduling jobs")
  @schedule.job_specs.each do |name, spec|
    load_into_rufus(name, spec)
  end
end

#rufusObject



41
42
43
# File 'lib/sidecloq/scheduler.rb', line 41

def rufus
  @rufus ||= Rufus::Scheduler.new
end

#runObject

run queues jobs per their schedules, blocking forever



12
13
14
15
16
17
18
19
20
# File 'lib/sidecloq/scheduler.rb', line 12

def run
  logger.info("Loading schedules into redis")
  sync_with_redis
  logger.info("Starting scheduler")
  load_schedule_into_rufus
  logger.debug("Joining rufus thread")
  rufus.join
  logger.debug("Scheduler run ended")
end

#safe_enqueue_job(name, spec) ⇒ Object



69
70
71
72
73
74
75
76
77
78
# File 'lib/sidecloq/scheduler.rb', line 69

def safe_enqueue_job(name, spec)
  logger.info "enqueueing #{name}"

  # failed enqeueuing should not b0rk stuff
  begin
    enqueue_job!(name, spec)
  rescue => e
    logger.info "error enqueuing #{name} - #{e.class.name}: #{e.message}"
  end
end

#stop(timeout = nil) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/sidecloq/scheduler.rb', line 22

def stop(timeout = nil)
  logger.info("Stopping scheduler (timeout: #{timeout})")
  if timeout
    t = Concurrent::ScheduledTask.new(timeout) do
      rufus.shutdown(:kill) if rufus.up?
    end
    Thread.new do
      rufus.shutdown(:wait)
      t.cancel
    end
  else
    rufus.shutdown(:wait)
  end
  rufus.join
  logger.info("Stopped scheduler")
end

#sync_with_redisObject



45
46
47
# File 'lib/sidecloq/scheduler.rb', line 45

def sync_with_redis
  @schedule.save_redis
end