Class: LogStash::Outputs::Fluentd

Inherits:
Base
  • Object
show all
Includes:
Stud::Buffer
Defined in:
lib/logstash/outputs/fluentd.rb

Constant Summary collapse

VERSION =
Gem::Specification.load(File.expand_path('../../../../logstash-output-fluentd.gemspec', __FILE__)).version

Instance Method Summary collapse

Instance Method Details

#closeObject



88
89
90
# File 'lib/logstash/outputs/fluentd.rb', line 88

def close
  buffer_flush(final: true)
end

#flush(events, teardown = false) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
# File 'lib/logstash/outputs/fluentd.rb', line 65

def flush(events, teardown = false)
  @logger.debug "flushing #{events} events"
  return if events.size < 1

  data = [@tag, events.join].to_msgpack

  @logger.debug "sending chunk #{data.bytesize} bytes"
  connect.write data

  @logger.debug "done"
end

#on_flush_error(e) ⇒ Object



78
79
80
# File 'lib/logstash/outputs/fluentd.rb', line 78

def on_flush_error(e)
  @logger.warn "flush error #{e.class}: #{e.message}"
end

#on_full_buffer_error(opts = {}) ⇒ Object



83
84
85
# File 'lib/logstash/outputs/fluentd.rb', line 83

def on_full_buffer_error(opts={})
  @logger.warn "buffer exceeds limits: pending:#{opts[:pending_count]}, outgoing:#{opts[:outgoing_count]}"
end

#receive(event) ⇒ Object



55
56
57
58
59
60
61
62
# File 'lib/logstash/outputs/fluentd.rb', line 55

def receive(event)
  @logger.debug "receive a event"

  entry = [(event.timestamp.to_i || Time.now.to_i), event]
  buffer_receive(entry.to_msgpack)

  @logger.debug "buffered a event"
end

#registerObject



46
47
48
49
50
51
52
# File 'lib/logstash/outputs/fluentd.rb', line 46

def register
  buffer_initialize(
    max_items: @flush_size,
    max_interval: @flush_interval,
    logger: @logger
  )
end