Class: LogStash::Outputs::Fluentd
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::Fluentd
- Includes:
- Stud::Buffer
- Defined in:
- lib/logstash/outputs/fluentd.rb
Constant Summary collapse
- VERSION =
Gem::Specification.load(File.('../../../../logstash-output-fluentd.gemspec', __FILE__)).version
Instance Method Summary collapse
- #close ⇒ Object
- #flush(events, teardown = false) ⇒ Object
- #on_flush_error(e) ⇒ Object
- #on_full_buffer_error(opts = {}) ⇒ Object
- #receive(event) ⇒ Object
- #register ⇒ Object
Instance Method Details
#close ⇒ Object
88 89 90 |
# File 'lib/logstash/outputs/fluentd.rb', line 88 def close buffer_flush(final: true) end |
#flush(events, teardown = false) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/logstash/outputs/fluentd.rb', line 65 def flush(events, teardown = false) @logger.debug "flushing #{events} events" return if events.size < 1 data = [@tag, events.join].to_msgpack @logger.debug "sending chunk #{data.bytesize} bytes" connect.write data @logger.debug "done" end |
#on_flush_error(e) ⇒ Object
78 79 80 |
# File 'lib/logstash/outputs/fluentd.rb', line 78 def on_flush_error(e) @logger.warn "flush error #{e.class}: #{e.}" end |
#on_full_buffer_error(opts = {}) ⇒ Object
83 84 85 |
# File 'lib/logstash/outputs/fluentd.rb', line 83 def on_full_buffer_error(opts={}) @logger.warn "buffer exceeds limits: pending:#{opts[:pending_count]}, outgoing:#{opts[:outgoing_count]}" end |
#receive(event) ⇒ Object
55 56 57 58 59 60 61 62 |
# File 'lib/logstash/outputs/fluentd.rb', line 55 def receive(event) @logger.debug "receive a event" entry = [(event..to_i || Time.now.to_i), event] buffer_receive(entry.to_msgpack) @logger.debug "buffered a event" end |
#register ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/logstash/outputs/fluentd.rb', line 46 def register buffer_initialize( max_items: @flush_size, max_interval: @flush_interval, logger: @logger ) end |