Class: Sidecloq::Scheduler
- Inherits:
-
Object
- Object
- Sidecloq::Scheduler
show all
- Includes:
- Utils
- Defined in:
- lib/sidecloq/scheduler.rb
Overview
Scheduler enqueues jobs according to the given schedule
Instance Method Summary
collapse
Methods included from Utils
included, #logger, #redis, #will_never_run
Constructor Details
#initialize(schedule, options = {}) ⇒ Scheduler
Returns a new instance of Scheduler.
6
7
8
9
10
11
|
# File 'lib/sidecloq/scheduler.rb', line 6
def initialize(schedule, options = {})
@schedule = schedule
@options = options
@loaded = Concurrent::Event.new
@running = false
end
|
Instance Method Details
#load_into_rufus(name, spec) ⇒ Object
55
56
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/sidecloq/scheduler.rb', line 55
def load_into_rufus(name, spec)
if will_never_run(spec['cron'])
logger.info("Impossible cronline detected, not scheduling #{name}: #{spec}")
else
logger.info("Scheduling #{name}: #{spec}")
rufus.cron(spec['cron']) do
safe_enqueue_job(name, spec)
end
end
end
|
#load_schedule_into_rufus ⇒ Object
47
48
49
50
51
52
53
|
# File 'lib/sidecloq/scheduler.rb', line 47
def load_schedule_into_rufus
logger.debug('Scheduling jobs')
@schedule.job_specs.each do |name, spec|
load_into_rufus(name, spec)
end
@loaded.set
end
|
#rufus ⇒ Object
39
40
41
|
# File 'lib/sidecloq/scheduler.rb', line 39
def rufus
@rufus ||= Rufus::Scheduler.new
end
|
#run ⇒ Object
run queues jobs per their schedules, blocking forever
14
15
16
17
18
19
20
21
|
# File 'lib/sidecloq/scheduler.rb', line 14
def run
@running = true
logger.info('Loading schedules into redis')
sync_with_redis
logger.info('Starting scheduler')
load_schedule_into_rufus
rufus.join
end
|
#safe_enqueue_job(name, spec) ⇒ Object
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/sidecloq/scheduler.rb', line 68
def safe_enqueue_job(name, spec)
logger.info "enqueuing #{name}"
begin
JobEnqueuer.new(spec).enqueue
rescue => e
logger.info "error enqueuing #{name} - #{e.class.name}: #{e.message}"
end
end
|
#stop(timeout = nil) ⇒ Object
23
24
25
26
27
28
29
30
31
|
# File 'lib/sidecloq/scheduler.rb', line 23
def stop(timeout = nil)
return unless @running
logger.info("Stopping scheduler (timeout: #{timeout})")
rufus.shutdown(:kill)
rufus.thread.join(timeout)
@running = false
logger.info('Stopped scheduler')
end
|
#sync_with_redis ⇒ Object
43
44
45
|
# File 'lib/sidecloq/scheduler.rb', line 43
def sync_with_redis
@schedule.save_redis
end
|
#wait_for_loaded ⇒ Object
35
36
37
|
# File 'lib/sidecloq/scheduler.rb', line 35
def wait_for_loaded
@loaded.wait
end
|