Class: Ocular::Inputs::RabbitMQ::Input::DSLProxy

Inherits:
Object
  • Object
show all
Defined in:
lib/ocular/inputs/rabbitmq_input.rb

Instance Method Summary collapse

Constructor Details

#initialize(proxy, handler, logger) ⇒ DSLProxy

Returns a new instance of DSLProxy.



55
56
57
58
59
# File 'lib/ocular/inputs/rabbitmq_input.rb', line 55

def initialize(proxy, handler, logger)
    @proxy = proxy
    @handler = handler
    @logger = logger
end

Instance Method Details

#subscribe(queue, *settings, &block) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/ocular/inputs/rabbitmq_input.rb', line 61

def subscribe(queue, *settings, &block)
    eventbase = Ocular::DSL::EventBase.new(@proxy, &block)
    ::Ocular.logger.debug "rabbitmq.subscribe to# #{queue} for block #{block}"

    ch = @handler.conn.create_channel
    q  = ch.queue(queue, *settings)

    q.subscribe(:manual_ack => true) do |delivery_info, , payload|
        context = RabbitMQRunContext.new(@logger)
        context.log_cause("rabbitmq.subscribe(#{queue})", {:delivery_info => delivery_info, :metadata => , :payload => payload})
        context.delivery_info = delivery_info
        context. = 
        context.payload = payload
        eventbase.exec(context)
        ch.acknowledge(delivery_info.delivery_tag, false)
    end

    id = queue + "-" + block.to_s

    @proxy.events[id] = eventbase

    return id
end