Class: Fluent::Plugin::NATSOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_nats.rb

Constant Summary collapse

DEFAULT_FORMAT_TYPE =
'json'

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#formatterObject

Returns the value of attribute formatter.



37
38
39
# File 'lib/fluent/plugin/out_nats.rb', line 37

def formatter
  @formatter
end

Instance Method Details

#configure(conf) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/out_nats.rb', line 39

def configure(conf)
  super

  @nats_config = {
    uri: "nats://#{@host}:#{@port}",
    ssl: @ssl,
    user: @user,
    pass: @password,
    reconnect_time_wait: @reconnect_time_wait,
    max_reconnect_attempts: @max_reconnect_attempts,
  }
  @formatter = formatter_create
end

#format(tag, time, record) ⇒ Object



85
86
87
# File 'lib/fluent/plugin/out_nats.rb', line 85

def format(tag, time, record)
  @formatter.format(tag, time, record)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/fluent/plugin/out_nats.rb', line 33

def multi_workers_ready?
  true
end

#process(tag, es) ⇒ Object



76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/out_nats.rb', line 76

def process(tag, es)
  es = inject_values_to_event_stream(tag, es)
  es.each do |time,record|
    EM.next_tick do
      NATS.publish(tag, format(tag, time, record))
    end
  end
end

#runObject



65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/out_nats.rb', line 65

def run
  NATS.on_error do |error|
    log.error "Server Error:", error: error
    # supervisor will restart worker
    exit!
  end
  NATS.start(@nats_config) do
    log.info "nats client is running for #{@nats_config[:uri]}"
  end
end

#shutdownObject



58
59
60
61
62
63
# File 'lib/fluent/plugin/out_nats.rb', line 58

def shutdown
  EM.next_tick do
    NATS.stop
  end
  super
end

#startObject



53
54
55
56
# File 'lib/fluent/plugin/out_nats.rb', line 53

def start
  super
  thread_create(:nats_output_main, &method(:run))
end