Class: LogStash::Outputs::Fluentd
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::Fluentd
- Includes:
- Stud::Buffer
- Defined in:
- lib/logstash/outputs/fluentd.rb
Constant Summary collapse
- VERSION =
Gem::Specification.load(File.('../../../../logstash-output-fluentd.gemspec', __FILE__)).version
Instance Method Summary collapse
- #close ⇒ Object
- #convert_from_event_to_msgpack(event) ⇒ Object
- #flush(events, teardown = false) ⇒ Object
- #on_flush_error(e) ⇒ Object
- #on_full_buffer_error(opts = {}) ⇒ Object
- #receive(event) ⇒ Object
- #register ⇒ Object
Instance Method Details
#close ⇒ Object
87 88 89 |
# File 'lib/logstash/outputs/fluentd.rb', line 87 def close buffer_flush(final: true) end |
#convert_from_event_to_msgpack(event) ⇒ Object
36 37 38 39 40 41 42 43 |
# File 'lib/logstash/outputs/fluentd.rb', line 36 def convert_from_event_to_msgpack(event) entry = [(event..to_i || Time.now.to_i), event.to_hash] begin entry.to_msgpack rescue ArgumentError, NoMethodError LogStash::Json.load(LogStash::Json.dump(entry)).to_msgpack end end |
#flush(events, teardown = false) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/logstash/outputs/fluentd.rb', line 64 def flush(events, teardown = false) @logger.debug "flushing #{events} events" return if events.size < 1 data = [@tag, events.join].to_msgpack @logger.debug "sending chunk #{data.bytesize} bytes" connect.write data @logger.debug "done" end |
#on_flush_error(e) ⇒ Object
77 78 79 |
# File 'lib/logstash/outputs/fluentd.rb', line 77 def on_flush_error(e) @logger.warn "flush error #{e.class}: #{e.}" end |
#on_full_buffer_error(opts = {}) ⇒ Object
82 83 84 |
# File 'lib/logstash/outputs/fluentd.rb', line 82 def on_full_buffer_error(opts={}) @logger.warn "buffer exceeds limits: pending:#{opts[:pending_count]}, outgoing:#{opts[:outgoing_count]}" end |
#receive(event) ⇒ Object
55 56 57 58 59 60 61 |
# File 'lib/logstash/outputs/fluentd.rb', line 55 def receive(event) @logger.debug "receive a event" buffer_receive(convert_from_event_to_msgpack(event)) @logger.debug "buffered a event" end |
#register ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/logstash/outputs/fluentd.rb', line 46 def register buffer_initialize( max_items: @flush_size, max_interval: @flush_interval, logger: @logger ) end |