Class: Ocular::Inputs::RabbitMQ::Input::DSLProxy

Inherits:
Object
  • Object
show all
Defined in:
lib/ocular/inputs/rabbitmq_input.rb

Instance Method Summary collapse

Constructor Details

#initialize(proxy, handler, logger) ⇒ DSLProxy

Returns a new instance of DSLProxy.



56
57
58
59
60
# File 'lib/ocular/inputs/rabbitmq_input.rb', line 56

def initialize(proxy, handler, logger)
    @proxy = proxy
    @handler = handler
    @logger = logger
end

Instance Method Details

#subscribe(queue, *settings, &block) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/ocular/inputs/rabbitmq_input.rb', line 62

def subscribe(queue, *settings, &block)
    eventbase = Ocular::DSL::EventBase.new(@proxy, &block)
    ::Ocular.logger.debug "rabbitmq.subscribe to# #{queue} for block #{block}"

    ch = @handler.conn.create_channel
    q  = ch.queue(queue, *settings)

    q.subscribe(:manual_ack => true) do |delivery_info, , payload|
        context = RabbitMQRunContext.new(@logger)
        context.log_cause("rabbitmq.subscribe(#{queue})", {:delivery_info => delivery_info, :metadata => , :payload => payload})
        context.delivery_info = delivery_info
        context. = 
        context.payload = payload
        begin
            eventbase.exec(context)
            ch.acknowledge(delivery_info.delivery_tag, false)
        rescue 
            sleep 1
            warn "Error on RabbitMQ event processing on context #{context}"
            ch.reject(delivery_info.delivery_tag, true)
        end
    end

    id = queue + "-" + block.to_s

    @proxy.events[id] = eventbase

    return id
end