Class: LogStash::Inputs::Nsq

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/nsq.rb

Instance Method Summary collapse

Instance Method Details

#registerObject



16
17
18
# File 'lib/logstash/inputs/nsq.rb', line 16

def register
 @logger.info('Registering nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
end

#run(logstash_queue) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/logstash/inputs/nsq.rb', line 21

def run(logstash_queue)
  @logger.info('Running nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
  begin
    begin
      consumer = Nsq::Consumer.new(
         :nsqlookupd => @nsqlookupd,
         :topic => @topic,
         :channel => @channel
      )
      while true
        #@logger.info('consuming...')
        event = consumer.pop
        #@logger.info('processing:', :event => event)
        queue_event(event.body, logstash_queue)
 event.finish
      end
    rescue LogStash::ShutdownSignal
      @logger.info('nsq got shutdown signal')
    end
    @logger.info('Done running nsq input')
  rescue => e
    @logger.warn('client threw exception, restarting',
                 :exception => e)
    retry
  end
  finished
end