Class: EmPipelines::AmqpEventSource

Inherits:
EventSource show all
Defined in:
lib/empipelines/amqp_event_source.rb

Overview

this must have a on_finished!

Instance Method Summary collapse

Methods inherited from EventSource

#on_event, #on_finished

Constructor Details

#initialize(em, queue, event_name, monitoring) ⇒ AmqpEventSource

Returns a new instance of AmqpEventSource.



10
11
12
# File 'lib/empipelines/amqp_event_source.rb', line 10

def initialize(em, queue, event_name, monitoring)
  @em, @queue, @event_name, @monitoring = em, queue, event_name, monitoring
end

Instance Method Details

#start!Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/empipelines/amqp_event_source.rb', line 14

def start!
  @queue.subscribe(:ack => true) do |header, json_payload|
    begin
      message = Message.new({
                              :origin     => @queue.name,
                              :payload    => JSON.parse(json_payload),
                              :event      => @event_name,
                              :started_at => Time.now.to_i
                            })
      message.on_consumed { |m| header.ack }
      message.on_broken   { |m| header.reject(:requeue => false) }
      message.on_rejected { |m| header.reject(:requeue => true) }
      event!(message)
    rescue => exc
      @monitoring.inform_exception!(exc, self, 'removing message from queue')
      header.reject(:requeue => false)
    end
  end
end