Class: EmPipelines::AmqpEventSource
- Inherits:
-
EventSource
- Object
- EventSource
- EmPipelines::AmqpEventSource
- Defined in:
- lib/empipelines/amqp_event_source.rb
Overview
this must have a on_finished!
Instance Method Summary collapse
-
#initialize(em, queue, event_name, monitoring) ⇒ AmqpEventSource
constructor
A new instance of AmqpEventSource.
- #start! ⇒ Object
Methods inherited from EventSource
Constructor Details
#initialize(em, queue, event_name, monitoring) ⇒ AmqpEventSource
Returns a new instance of AmqpEventSource.
10 11 12 |
# File 'lib/empipelines/amqp_event_source.rb', line 10 def initialize(em, queue, event_name, monitoring) @em, @queue, @event_name, @monitoring = em, queue, event_name, monitoring end |
Instance Method Details
#start! ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/empipelines/amqp_event_source.rb', line 14 def start! @queue.subscribe(:ack => true) do |header, json_payload| begin = Message.new({ :origin => @queue.name, :payload => JSON.parse(json_payload), :event => @event_name, :started_at => Time.now.to_i }) .on_consumed { |m| header.ack } .on_broken { |m| header.reject(:requeue => false) } .on_rejected { |m| header.reject(:requeue => true) } event!() rescue => exc @monitoring.inform_exception!(exc, self, 'removing message from queue') header.reject(:requeue => false) end end end |