Class: Ocular::Inputs::RabbitMQ::Input::DSLProxy
- Inherits:
-
Object
- Object
- Ocular::Inputs::RabbitMQ::Input::DSLProxy
- Defined in:
- lib/ocular/inputs/rabbitmq_input.rb
Instance Method Summary collapse
-
#initialize(proxy, handler, logger) ⇒ DSLProxy
constructor
A new instance of DSLProxy.
- #subscribe(queue, *settings, &block) ⇒ Object
Constructor Details
#initialize(proxy, handler, logger) ⇒ DSLProxy
Returns a new instance of DSLProxy.
55 56 57 58 59 |
# File 'lib/ocular/inputs/rabbitmq_input.rb', line 55 def initialize(proxy, handler, logger) @proxy = proxy @handler = handler @logger = logger end |
Instance Method Details
#subscribe(queue, *settings, &block) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/ocular/inputs/rabbitmq_input.rb', line 61 def subscribe(queue, *settings, &block) eventbase = Ocular::DSL::EventBase.new(@proxy, &block) ::Ocular.logger.debug "rabbitmq.subscribe to# #{queue} for block #{block}" ch = @handler.conn.create_channel q = ch.queue(queue, *settings) q.subscribe(:manual_ack => true) do |delivery_info, , payload| context = RabbitMQRunContext.new(@logger) context.log_cause("rabbitmq.subscribe(#{queue})", {:delivery_info => delivery_info, :metadata => , :payload => payload}) context.delivery_info = delivery_info context. = context.payload = payload eventbase.exec(context) ch.acknowledge(delivery_info.delivery_tag, false) end id = queue + "-" + block.to_s @proxy.events[id] = eventbase return id end |