Class: Fluent::Plugin::NATSOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::NATSOutput
- Defined in:
- lib/fluent/plugin/out_nats.rb
Constant Summary collapse
- DEFAULT_FORMAT_TYPE =
'json'
Instance Attribute Summary collapse
-
#formatter ⇒ Object
Returns the value of attribute formatter.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #process(tag, es) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Attribute Details
#formatter ⇒ Object
Returns the value of attribute formatter.
37 38 39 |
# File 'lib/fluent/plugin/out_nats.rb', line 37 def formatter @formatter end |
Instance Method Details
#configure(conf) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/plugin/out_nats.rb', line 39 def configure(conf) super @nats_config = { uri: "nats://#{@host}:#{@port}", ssl: @ssl, user: @user, pass: @password, reconnect_time_wait: @reconnect_time_wait, max_reconnect_attempts: @max_reconnect_attempts, } @formatter = formatter_create end |
#format(tag, time, record) ⇒ Object
85 86 87 |
# File 'lib/fluent/plugin/out_nats.rb', line 85 def format(tag, time, record) @formatter.format(tag, time, record) end |
#multi_workers_ready? ⇒ Boolean
33 34 35 |
# File 'lib/fluent/plugin/out_nats.rb', line 33 def multi_workers_ready? true end |
#process(tag, es) ⇒ Object
76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/out_nats.rb', line 76 def process(tag, es) es = inject_values_to_event_stream(tag, es) es.each do |time,record| EM.next_tick do NATS.publish(tag, format(tag, time, record)) end end end |
#run ⇒ Object
65 66 67 68 69 70 71 72 73 74 |
# File 'lib/fluent/plugin/out_nats.rb', line 65 def run NATS.on_error do |error| log.error "Server Error:", error: error # supervisor will restart worker exit! end NATS.start(@nats_config) do log.info "nats client is running for #{@nats_config[:uri]}" end end |
#shutdown ⇒ Object
58 59 60 61 62 63 |
# File 'lib/fluent/plugin/out_nats.rb', line 58 def shutdown EM.next_tick do NATS.stop end super end |
#start ⇒ Object
53 54 55 56 |
# File 'lib/fluent/plugin/out_nats.rb', line 53 def start super thread_create(:nats_output_main, &method(:run)) end |