Class: Fluent::Plugin::NATSInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_nats.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/in_nats.rb', line 31

def configure(conf)
  super

  @nats_config = {
    uri: "nats://#{@host}:#{@port}",
    ssl: @ssl,
    user: @user,
    pass: @password,
    reconnect_time_wait: @reconnect_time_wait,
    max_reconnect_attempts: @max_reconnect_attempts,
  }
end

#runObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/in_nats.rb', line 62

def run
  EM.next_tick do
    @nats_conn = NATS.connect(@nats_config) do
      @queues.each do |queue|
        @nats_conn.subscribe(queue) do |msg, _reply, sub|
          tag = "#{@tag}.#{sub}"
          begin
            message = JSON.parse(msg)
          rescue JSON::ParserError => e
            log.error "Failed parsing JSON #{e.inspect}.  Passing as a normal string"
            message = msg
          end
          time = Engine.now
          router.emit(tag, time, message || {})
        end
      end
    end
  end
end

#shutdownObject



56
57
58
59
60
# File 'lib/fluent/plugin/in_nats.rb', line 56

def shutdown
  @nats_conn.close
  EM.stop if EM.reactor_running?
  super
end

#startObject



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin/in_nats.rb', line 44

def start
  super
  NATS.on_error do |error|
    log.error "Server Error:", error: error
    # supervisor will restart worker
    exit!
  end
  run_reactor_thread
  thread_create(:nats_input_main, &method(:run))
  log.info "listening nats on #{@uri}/#{@queue}"
end