Class: Fluent::Plugin::GelfInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::GelfInput
- Defined in:
- lib/fluent/plugin/in_gelf.rb
Constant Summary collapse
- DEFAULT_PARSER =
'json'.freeze
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(time, record) ⇒ Object
-
#initialize ⇒ GelfInput
constructor
A new instance of GelfInput.
- #listen ⇒ Object
- #receive_data(data, addr) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ GelfInput
Returns a new instance of GelfInput.
19 20 21 |
# File 'lib/fluent/plugin/in_gelf.rb', line 19 def initialize super end |
Instance Method Details
#configure(conf) ⇒ Object
38 39 40 41 42 43 |
# File 'lib/fluent/plugin/in_gelf.rb', line 38 def configure(conf) compat_parameters_convert(conf, :parser) super @parser = parser_create end |
#emit(time, record) ⇒ Object
101 102 103 104 105 |
# File 'lib/fluent/plugin/in_gelf.rb', line 101 def emit(time, record) router.emit(@tag, time, record) rescue => e log.error 'gelf failed to emit', error: e.to_s, error_class: e.class.to_s, tag: @tag, record: Yajl.dump(record) end |
#listen ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/in_gelf.rb', line 85 def listen log.info "listening gelf socket on #{@bind}:#{@port} with #{@protocol_type}" if @protocol_type == :tcp server_create(:in_tcp_server, @port, bind: @bind) do |data, conn| receive_data(data, conn) end else # Graylog is ready to accept 8192 bytes of GELF, but with # chunking we need to accept at least 12 bytes of chunk header # as well. But we don't need to pin us down to that either. server_create(:in_udp_server, @port, proto: :udp, bind: @bind, max_bytes: 65536) do |data, sock| receive_data(data, sock) end end end |
#receive_data(data, addr) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/in_gelf.rb', line 55 def receive_data(data, addr) begin msg = Gelfd2::Parser.parse(data) rescue => e log.warn 'Gelfd failed to parse a message', error: e.to_s log.warn_backtrace end # Gelfd parser will return nil if it received and parsed a non-final chunk return if msg.nil? @parser.parse(msg) { |time, record| unless time && record log.warn "pattern not match: #{msg.inspect}" return end # Use the recorded event time if available time = record.delete('timestamp').to_f if record.key?('timestamp') # Postprocess recorded event strip_leading_underscore_(record) if @strip_leading_underscore emit(time, record) } rescue => e log.error data.dump, error: e.to_s log.error_backtrace end |
#shutdown ⇒ Object
51 52 53 |
# File 'lib/fluent/plugin/in_gelf.rb', line 51 def shutdown super end |
#start ⇒ Object
45 46 47 48 49 |
# File 'lib/fluent/plugin/in_gelf.rb', line 45 def start super listen end |