65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/cloudist/queues/basic_queue.rb', line 65
def subscribe(&block)
queue.subscribe(:ack => true) do |, encoded_message|
request = Cloudist::Request.new(self, encoded_message, )
handle_request = proc {
begin
raise Cloudist::ExpiredMessage if request.expired?
block.call(request)
rescue Cloudist::ExpiredMessage
log.error "AMQP Message Timeout: #{tag} ttl=#{request.ttl} age=#{request.age}"
rescue => e
Cloudist.handle_error(e)
ensure
request.ack
end
}
handle_ack = proc {
request.ack
}
EM.defer(handle_request, handle_ack)
end
log.info "AMQP Subscribed: #{tag}"
self
end
|