Class: Fluent::Plugin::NATSInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::NATSInput
- Defined in:
- lib/fluent/plugin/in_nats.rb
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/fluent/plugin/in_nats.rb', line 31 def configure(conf) super @nats_config = { uri: "nats://#{@host}:#{@port}", ssl: @ssl, user: @user, pass: @password, reconnect_time_wait: @reconnect_time_wait, max_reconnect_attempts: @max_reconnect_attempts, } end |
#run ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/fluent/plugin/in_nats.rb', line 62 def run EM.next_tick do @nats_conn = NATS.connect(@nats_config) do @queues.each do |queue| @nats_conn.subscribe(queue) do |msg, _reply, sub| tag = "#{@tag}.#{sub}" begin = JSON.parse(msg) rescue JSON::ParserError => e log.error "Failed parsing JSON #{e.inspect}. Passing as a normal string" = msg end time = Engine.now router.emit(tag, time, || {}) end end end end end |
#shutdown ⇒ Object
56 57 58 59 60 |
# File 'lib/fluent/plugin/in_nats.rb', line 56 def shutdown @nats_conn.close EM.stop if EM.reactor_running? super end |
#start ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/fluent/plugin/in_nats.rb', line 44 def start super NATS.on_error do |error| log.error "Server Error:", error: error # supervisor will restart worker exit! end run_reactor_thread thread_create(:nats_input_main, &method(:run)) log.info "listening nats on #{@uri}/#{@queue}" end |