Class: Sidecloq::Scheduler
- Inherits:
-
Object
- Object
- Sidecloq::Scheduler
show all
- Includes:
- Utils
- Defined in:
- lib/sidecloq/scheduler.rb
Overview
Scheduler enqeues jobs according to the given schedule
Instance Method Summary
collapse
Methods included from Utils
included, #logger, #redis, #will_never_run
Constructor Details
#initialize(schedule, options = {}) ⇒ Scheduler
Returns a new instance of Scheduler.
6
7
8
9
|
# File 'lib/sidecloq/scheduler.rb', line 6
def initialize(schedule, options = {})
@schedule = schedule
@options = options
end
|
Instance Method Details
#enqueue_job!(name, spec) ⇒ Object
can raise exceptions, but shouldn’t
81
82
83
|
# File 'lib/sidecloq/scheduler.rb', line 81
def enqueue_job!(name, spec)
Sidekiq::Client.push(spec)
end
|
#load_into_rufus(name, spec) ⇒ Object
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/sidecloq/scheduler.rb', line 56
def load_into_rufus(name, spec)
if will_never_run(spec['cron'])
logger.info("Impossible cronline detected, not scheduling #{name}: #{spec}")
else
logger.info("Scheduling #{name}: #{spec}")
rufus.cron(spec['cron']) do
safe_enqueue_job(name, spec)
end
end
end
|
#load_schedule_into_rufus ⇒ Object
49
50
51
52
53
54
|
# File 'lib/sidecloq/scheduler.rb', line 49
def load_schedule_into_rufus
logger.debug("Scheduling jobs")
@schedule.job_specs.each do |name, spec|
load_into_rufus(name, spec)
end
end
|
#rufus ⇒ Object
41
42
43
|
# File 'lib/sidecloq/scheduler.rb', line 41
def rufus
@rufus ||= Rufus::Scheduler.new
end
|
#run ⇒ Object
run queues jobs per their schedules, blocking forever
12
13
14
15
16
17
18
19
20
|
# File 'lib/sidecloq/scheduler.rb', line 12
def run
logger.info("Loading schedules into redis")
sync_with_redis
logger.info("Starting scheduler")
load_schedule_into_rufus
logger.debug("Joining rufus thread")
rufus.join
logger.debug("Scheduler run ended")
end
|
#safe_enqueue_job(name, spec) ⇒ Object
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/sidecloq/scheduler.rb', line 69
def safe_enqueue_job(name, spec)
logger.info "enqueueing #{name}"
begin
enqueue_job!(name, spec)
rescue => e
logger.info "error enqueuing #{name} - #{e.class.name}: #{e.message}"
end
end
|
#stop(timeout = nil) ⇒ Object
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/sidecloq/scheduler.rb', line 22
def stop(timeout = nil)
logger.info("Stopping scheduler (timeout: #{timeout})")
if timeout
t = Concurrent::ScheduledTask.new(timeout) do
rufus.shutdown(:kill) if rufus.up?
end
Thread.new do
rufus.shutdown(:wait)
t.cancel
end
else
rufus.shutdown(:wait)
end
rufus.join
logger.info("Stopped scheduler")
end
|
#sync_with_redis ⇒ Object
45
46
47
|
# File 'lib/sidecloq/scheduler.rb', line 45
def sync_with_redis
@schedule.save_redis
end
|