97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
# File 'lib/queue_map/consumer.rb', line 97
def run_consumer
begin
QueueMap.with_bunny do |bunny|
q = bunny.queue(name.to_s, :durable => false, :auto_delete => false, :ack => true)
logger.info "Process #{Process.pid} is listening on #{name.to_s}"
begin
msg = q.pop
(idle_proc.call; next) if msg == :queue_empty
before_job_procs.each { |p| p.call }
begin
Timeout.timeout(job_timeout) do
msg = Marshal.load(msg)
result = worker_proc.call(msg[:input])
bunny.queue(msg[:response_queue]).publish(Marshal.dump(:result => result, :index => msg[:index]))
end
ensure
after_response_procs.each { |p| p.call }
end
rescue Qrack::ClientTimeout
rescue Timeout::Error
logger.info "Job took longer than #{job_timeout} seconds to complete. Aborting"
end while ! @shutting_down
end
rescue Exception => e if on_exception_proc
on_exception_proc.call(e)
else
logger.info e.class
logger.error e.message
logger.error e.backtrace
end
sleep 0.2
end while ! @shutting_down
logger.info "Done."
end
|