68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/disc.rb', line 68
def run
$stdout.puts("Disc::Worker listening in #{queues}")
loop do
jobs = disque.fetch(from: queues, timeout: timeout, count: count)
Array(jobs).each do |queue, msgid, serialized_job|
begin
job = MessagePack.unpack(serialized_job)
job.update('id' => msgid, 'queue' => queue)
instance = Object.const_get(job['class']).new
instance.perform(*job['arguments'])
disque.call('ACKJOB', msgid)
rescue => err
Disc.on_error(err, job)
end
end
end
end
|