Class: LogStash::Outputs::Fluentd

Inherits:
Base
  • Object
show all
Includes:
Stud::Buffer
Defined in:
lib/logstash/outputs/fluentd.rb

Constant Summary collapse

VERSION =
Gem::Specification.load(File.expand_path('../../../../logstash-output-fluentd.gemspec', __FILE__)).version

Instance Method Summary collapse

Instance Method Details

#closeObject



87
88
89
# File 'lib/logstash/outputs/fluentd.rb', line 87

def close
  buffer_flush(final: true)
end

#convert_from_event_to_msgpack(event) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/logstash/outputs/fluentd.rb', line 36

def convert_from_event_to_msgpack(event)
  entry = [(event.timestamp.to_i || Time.now.to_i), event.to_hash]
  begin
    entry.to_msgpack
  rescue ArgumentError, NoMethodError
    LogStash::Json.load(LogStash::Json.dump(entry)).to_msgpack
  end
end

#flush(events, teardown = false) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
# File 'lib/logstash/outputs/fluentd.rb', line 64

def flush(events, teardown = false)
  @logger.debug "flushing #{events} events"
  return if events.size < 1

  data = [@tag, events.join].to_msgpack

  @logger.debug "sending chunk #{data.bytesize} bytes"
  connect.write data

  @logger.debug "done"
end

#on_flush_error(e) ⇒ Object



77
78
79
# File 'lib/logstash/outputs/fluentd.rb', line 77

def on_flush_error(e)
  @logger.warn "flush error #{e.class}: #{e.message}"
end

#on_full_buffer_error(opts = {}) ⇒ Object



82
83
84
# File 'lib/logstash/outputs/fluentd.rb', line 82

def on_full_buffer_error(opts={})
  @logger.warn "buffer exceeds limits: pending:#{opts[:pending_count]}, outgoing:#{opts[:outgoing_count]}"
end

#receive(event) ⇒ Object



55
56
57
58
59
60
61
# File 'lib/logstash/outputs/fluentd.rb', line 55

def receive(event)
  @logger.debug "receive a event"

  buffer_receive(convert_from_event_to_msgpack(event))

  @logger.debug "buffered a event"
end

#registerObject



46
47
48
49
50
51
52
# File 'lib/logstash/outputs/fluentd.rb', line 46

def register
  buffer_initialize(
    max_items: @flush_size,
    max_interval: @flush_interval,
    logger: @logger
  )
end