Method: EmPipelines::AmqpEventSource#start!

Defined in:
lib/empipelines/amqp_event_source.rb

#start!Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/empipelines/amqp_event_source.rb', line 14

def start!
  @queue.subscribe(:ack => true) do |header, json_payload|
    begin
      message = Message.new({
                              :origin     => @queue.name,
                              :payload    => JSON.parse(json_payload),
                              :event      => @event_name,
                              :started_at => Time.now.to_i
                            })
      message.on_consumed { |m| header.ack }
      message.on_broken   { |m| header.reject(:requeue => false) }
      message.on_rejected { |m| header.reject(:requeue => true) }
      event!(message)
    rescue => exc
      @monitoring.inform_exception!(exc, self, 'removing message from queue')
      header.reject(:requeue => false)
    end
  end
end