Class: LogStash::Inputs::Nats
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Nats
- Defined in:
- lib/logstash/inputs/nats.rb
Overview
This input plugin will read events from a NATS instance; it does not support NATS streaming instance. This plugin used the following ruby nats client: github.com/nats-io/ruby-nats
For more information about Nats, see <nats.io>
Examples:
- source,ruby
-
input {
# Read events on subject "example" by using an "url" without authentication nats { url => "nats://localhost:4222" subjects => ["example"] }
}
- source,ruby
-
input {
# Read events on subject "example" by using an "url" with authentication nats { url => "nats://user:passwd@localhost:4222" subjects => ["example"] }
}
- source,ruby
-
input {
# Read events on two subjects by using other paramaters nats { host => "localhost" port => 4222 user => "user" pass => "password" subjects => [ "first", "second" ] }
}
Instance Method Summary collapse
Instance Method Details
#register ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/logstash/inputs/nats.rb', line 104 def register @nats_server = build_nats_server @nats_config = { uri: @nats_server, ssl: @ssl, pedantic: @pedantic, verbose: @verbose, reconnect_time_wait: @reconnect_time_wait.nil? ? nil : @reconnect_time_wait.value, max_reconnect_attempts: @max_reconnect_attempts.nil? ? nil : @max_reconnect_attempts.value } end |
#run(queue) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/logstash/inputs/nats.rb', line 119 def run(queue) ['TERM', 'INT'].each { |s| trap(s) { puts; exit! } } NATS.on_error { |err| puts "Server Error: #{err}"; exit! } NATS.start(@nats_config) do @subjects.each do |subject| puts "Listening on [#{subject}]" #unless $show_raw NATS.subscribe(subject, :queue => @queue_group ) do |msg, _, sub| @codec.decode(msg) do |event| decorate(event) event.set("nats_subject", sub) queue << event end end end end end |