Class: Fluent::NSQOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::NSQOutput
- Defined in:
- lib/fluent/plugin/out_nsq.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ NSQOutput
constructor
A new instance of NSQOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ NSQOutput
Returns a new instance of NSQOutput.
11 12 13 |
# File 'lib/fluent/plugin/out_nsq.rb', line 11 def initialize super end |
Instance Method Details
#configure(conf) ⇒ Object
15 16 17 18 19 20 |
# File 'lib/fluent/plugin/out_nsq.rb', line 15 def configure(conf) super raise ConfigError, "Missing nsqlookupd" unless @nsqlookupd raise ConfigError, "Missing topic" unless @topic end |
#format(tag, time, record) ⇒ Object
34 35 36 |
# File 'lib/fluent/plugin/out_nsq.rb', line 34 def format(tag, time, record) [tag, time, record].to_msgpack end |
#shutdown ⇒ Object
30 31 32 |
# File 'lib/fluent/plugin/out_nsq.rb', line 30 def shutdown @producer.terminate end |
#start ⇒ Object
22 23 24 25 26 27 28 |
# File 'lib/fluent/plugin/out_nsq.rb', line 22 def start lookupds = @nsqlookupd.split(',') @producer = Nsq::Producer.new( nslookupd: lookupds, topic: @topic ) end |
#write(chunk) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/fluent/plugin/out_nsq.rb', line 38 def write(chunk) return if chunk.empty? chunk.msgpack_each do |tag, time, record| next unless record.is_a? Hash #TODO get rid of this extra copy tagged_record = record.merge( _key: tag, _ts: time ) @producer.write(tagged_record.to_json) end end |