14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
# File 'lib/empipelines/amqp_event_source.rb', line 14
def start!
@queue.subscribe(:ack => true) do |, json_payload|
begin
message = Message.new({
:origin => @queue.name,
:payload => JSON.parse(json_payload),
:event => @event_name,
:started_at => Time.now.to_i
})
message.on_consumed { |m| .ack }
message.on_broken { |m| .reject(:requeue => false) }
message.on_rejected { |m| .reject(:requeue => true) }
event!(message)
rescue => exc
@monitoring.inform_exception!(exc, self, 'removing message from queue')
.reject(:requeue => false)
end
end
end
|