Method: Disc::Worker#run

Defined in:
lib/disc.rb

#runObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/disc.rb', line 68

def run
  $stdout.puts("Disc::Worker listening in #{queues}")
  loop do
    jobs = disque.fetch(from: queues, timeout: timeout, count: count)
    Array(jobs).each do |queue, msgid, serialized_job|
      begin
        job = MessagePack.unpack(serialized_job)
        job.update('id' => msgid, 'queue' => queue)
        instance = Object.const_get(job['class']).new
        instance.perform(*job['arguments'])
        disque.call('ACKJOB', msgid)
      rescue => err
        Disc.on_error(err, job)
      end
    end
  end
end