17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/actn/jobs/worker.rb', line 17
def self.start
pg = PG::EM::Client.new(Actn::DB.db_config.dup.tap{|s| s.delete(:size) })
EM.run do
notify_proc = Proc.new { |notify|
raise JobError.new("Payload Missing") unless payload = (Oj.load(notify[:extra]) rescue nil)
raise JobError.new("Job not found") unless job = Job.find(payload['uuid'])
hook = job.hook
raise JobError.new("Hook name missing") unless hook['name']
raise JobError.new("Hook class missing") unless hook_class = (Object.const_get(hook['name']) rescue nil)
if (run_at = ((Time.parse(hook['run_at']) rescue nil) || eval(hook['run_at'])) ) > Time.now
EM.add_timer(run_at - Time.now, proc{ notify_proc.call(notify) })
else
hook_class.new(job).test_and_perform
end
}
error_proc = Proc.new { |ex|
Jobs.logger.error ex
}
wait_proc = Proc.new { |notify|
notify_proc.call(notify) if notify
pg.wait_for_notify_defer.callback(&wait_proc).errback(&error_proc)
}
pg.wait_for_notify_defer.callback(&wait_proc).errback(&error_proc)
pg.query_defer("LISTEN #{CHANNEL}")
end
end
|