Class: Ocular::Inputs::RabbitMQ::Input::DSLProxy
- Inherits:
-
Object
- Object
- Ocular::Inputs::RabbitMQ::Input::DSLProxy
- Defined in:
- lib/ocular/inputs/rabbitmq_input.rb
Instance Method Summary collapse
-
#initialize(proxy, handler, logger) ⇒ DSLProxy
constructor
A new instance of DSLProxy.
- #subscribe(queue, *settings, &block) ⇒ Object
Constructor Details
#initialize(proxy, handler, logger) ⇒ DSLProxy
Returns a new instance of DSLProxy.
56 57 58 59 60 |
# File 'lib/ocular/inputs/rabbitmq_input.rb', line 56 def initialize(proxy, handler, logger) @proxy = proxy @handler = handler @logger = logger end |
Instance Method Details
#subscribe(queue, *settings, &block) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/ocular/inputs/rabbitmq_input.rb', line 62 def subscribe(queue, *settings, &block) eventbase = Ocular::DSL::EventBase.new(@proxy, &block) ::Ocular.logger.debug "rabbitmq.subscribe to# #{queue} for block #{block}" ch = @handler.conn.create_channel q = ch.queue(queue, *settings) q.subscribe(:manual_ack => true) do |delivery_info, , payload| context = RabbitMQRunContext.new(@logger) context.after_fork() context.log_cause("rabbitmq.subscribe(#{queue})", {:delivery_info => delivery_info, :metadata => , :payload => payload}) context.delivery_info = delivery_info context. = context.payload = payload begin eventbase.exec(context) ch.acknowledge(delivery_info.delivery_tag, false) rescue StandardError => e sleep 1 warn "Error on RabbitMQ event processing on context #{context}. Error: #{e}" ch.reject(delivery_info.delivery_tag, true) end end id = queue + "-" + block.to_s @proxy.events[id] = eventbase return id end |