21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# File 'lib/logstash/inputs/nsq.rb', line 21
def run(logstash_queue)
@logger.info('Running nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
begin
begin
consumer = Nsq::Consumer.new(
:nsqlookupd => @nsqlookupd,
:topic => @topic,
:channel => @channel
)
while true
event = consumer.pop
queue_event(event.body, logstash_queue)
event.finish
end
rescue LogStash::ShutdownSignal
@logger.info('nsq got shutdown signal')
end
@logger.info('Done running nsq input')
rescue => e
@logger.warn('client threw exception, restarting',
:exception => e)
retry
end
finished
end
|