Class: LogStash::Inputs::Nsq

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/nsq.rb

Instance Method Summary collapse

Instance Method Details

#registerObject



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/logstash/inputs/nsq.rb', line 20

def register
  @logger.info('Registering nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
  if @tls_key and @tls_cert
    @consumer = Nsq::Consumer.new(
     :nsqlookupd => @nsqlookupd,
     :topic => @topic,
     :channel => @channel,
     :max_in_flight => @max_in_flight,
     :tls_v1 => @tls_v1,
     :tls_context => {
      key: @tls_key,
      certificate: @tls_cert
     }
    )
  else
    @consumer = Nsq::Consumer.new(
     :nsqlookupd => @nsqlookupd,
     :topic => @topic,
     :channel => @channel,
     :tls_v1 => @tls_v1,
     :max_in_flight => @max_in_flight
    )
  end
end

#run(logstash_queue) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/logstash/inputs/nsq.rb', line 46

def run(logstash_queue)
  @logger.info('Running nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
  begin
     while !stop?
        #@logger.info('consuming...')
        event = @consumer.pop
        #@logger.info('processing:', :event => event)
        queue_event(event.body, logstash_queue)
        event.finish
     end
     @logger.warn('Done running nsq input')
  end
end

#stopObject



83
84
85
86
# File 'lib/logstash/inputs/nsq.rb', line 83

def stop
  @logger.warn('nsq got stop signal. terminate')
  @consumer.terminate
end