Class: Hopper::Channel::Queue::Listener

Inherits:
Object
  • Object
show all
Defined in:
lib/hopper/channel/queue/listener.rb

Instance Method Summary collapse

Constructor Details

#initialize(proxy, opts = {}) ⇒ Listener

Returns a new instance of Listener.



8
9
10
# File 'lib/hopper/channel/queue/listener.rb', line 8

def initialize(proxy, opts = {})
  @proxy = proxy
end

Instance Method Details

#acknowledge(message) ⇒ Object



24
25
26
# File 'lib/hopper/channel/queue/listener.rb', line 24

def acknowledge(message)
  bunny_channel.acknowledge(message.tag)
end

#listenObject



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/hopper/channel/queue/listener.rb', line 12

def listen
  @proxy.subscribe(:ack => true, :block => true) do |delivery_information, properties, payload|
    message = Message::Delivered.new(self, delivery_information, properties, payload)
    begin
      yield message
    rescue Exception => e
      Hopper.logger.error "Caught exception #{e} that worker should have caught. Message #{message.tag} being rejected by default.\n\n#{e.backtrace.join("\n")}"
      reject(message)
    end
  end
end

#reject(message) ⇒ Object



32
33
34
# File 'lib/hopper/channel/queue/listener.rb', line 32

def reject(message)
  bunny_channel.reject(message.tag, false)
end

#retry(message) ⇒ Object



28
29
30
# File 'lib/hopper/channel/queue/listener.rb', line 28

def retry(message)
  bunny_channel.reject(message.tag, true)
end