Method: QueueMap::Consumer#run_consumer

Defined in:
lib/queue_map/consumer.rb

#run_consumerObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/queue_map/consumer.rb', line 97

def run_consumer
  begin
    QueueMap.with_bunny do |bunny|
      q = bunny.queue(name.to_s, :durable => false, :auto_delete => false, :ack => true)
      logger.info "Process #{Process.pid} is listening on #{name.to_s}"
      begin
        msg = q.pop
        (idle_proc.call; next) if msg == :queue_empty
        before_job_procs.each { |p| p.call }
        begin
          Timeout.timeout(job_timeout) do
            msg = Marshal.load(msg)
            result = worker_proc.call(msg[:input])
            bunny.queue(msg[:response_queue]).publish(Marshal.dump(:result => result, :index => msg[:index]))
          end
        ensure
          after_response_procs.each { |p| p.call }
        end
      rescue Qrack::ClientTimeout
      rescue Timeout::Error
        logger.info "Job took longer than #{job_timeout} seconds to complete. Aborting"
      end while ! @shutting_down
    end
  rescue Exception => e # Bunny gets into a strange state when exceptions are raised, so reconnect to queue server if it happens
    if on_exception_proc
      on_exception_proc.call(e)
    else
      logger.info e.class
      logger.error e.message
      logger.error e.backtrace
    end
    sleep 0.2
  end while ! @shutting_down
  logger.info "Done."
end