10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/skiplock/dispatcher.rb', line 10
def run
Thread.new do
Rails.application.reloader.wrap do
sleep(0.1) while @running && !Rails.application.initialized?
ActiveRecord::Base.connection_pool.with_connection do |connection|
connection.exec_query('LISTEN skiplock')
if @master
Job.where('scheduled_at > NOW() AND executions IS NOT NULL AND expired_at IS NULL AND finished_at IS NULL').update_all(scheduled_at: nil, updated_at: Time.now)
Cron.setup
end
error = false
while @running
begin
if error
unless connection.active?
connection.reconnect!
sleep(0.5)
connection.exec_query('LISTEN skiplock')
@next_schedule_at = Time.now
end
error = false
end
if Time.now.to_f >= @next_schedule_at && @executor.remaining_capacity > 0
@executor.post { do_work }
end
notifications = []
connection.raw_connection.wait_for_notify(0.1) do |channel, pid, payload|
notifications << payload if payload
loop do
payload = connection.raw_connection.notifies
break unless @running && payload
notifications << payload[:extra]
end
notifications.each do |n|
op, id, queue, priority, time = n.split(',')
if time.to_f <= Time.now.to_f
@next_schedule_at = Time.now.to_f
elsif time.to_f < @next_schedule_at
@next_schedule_at = time.to_f
end
end
end
rescue Exception => ex
error = true
timestamp = Time.now
while @running
sleep(0.5)
break if Time.now - timestamp > 10
end
end
sleep(0.1)
end
connection.exec_query('UNLISTEN *')
end
end
end
end
|