Class: Fluent::Plugin::GelfInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_gelf.rb

Constant Summary collapse

DEFAULT_PARSER =
'json'.freeze

Instance Method Summary collapse

Constructor Details

#initializeGelfInput

Returns a new instance of GelfInput.



19
20
21
# File 'lib/fluent/plugin/in_gelf.rb', line 19

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object



38
39
40
41
42
43
# File 'lib/fluent/plugin/in_gelf.rb', line 38

def configure(conf)
  compat_parameters_convert(conf, :parser)
  super

  @parser = parser_create
end

#emit(time, record) ⇒ Object



101
102
103
104
105
# File 'lib/fluent/plugin/in_gelf.rb', line 101

def emit(time, record)
  router.emit(@tag, time, record)
rescue => e
  log.error 'gelf failed to emit', error: e.to_s, error_class: e.class.to_s, tag: @tag, record: Yajl.dump(record)
end

#listenObject



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/in_gelf.rb', line 85

def listen
  log.info "listening gelf socket on #{@bind}:#{@port} with #{@protocol_type}"
  if @protocol_type == :tcp
    server_create(:in_tcp_server, @port, bind: @bind) do |data, conn|
      receive_data(data, conn)
    end
  else
    # Graylog is ready to accept 8192 bytes of GELF, but with
    # chunking we need to accept at least 12 bytes of chunk header
    # as well. But we don't need to pin us down to that either.
    server_create(:in_udp_server, @port, proto: :udp, bind: @bind, max_bytes: 65536) do |data, sock|
      receive_data(data, sock)
    end
  end
end

#receive_data(data, addr) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/in_gelf.rb', line 55

def receive_data(data, addr)
  begin
    msg = Gelfd2::Parser.parse(data)
  rescue => e
    log.warn 'Gelfd failed to parse a message', error: e.to_s
    log.warn_backtrace
  end

  # Gelfd parser will return nil if it received and parsed a non-final chunk
  return if msg.nil?

  @parser.parse(msg) { |time, record|
    unless time && record
      log.warn "pattern not match: #{msg.inspect}"
      return
    end

    # Use the recorded event time if available
    time = record.delete('timestamp').to_f if record.key?('timestamp')

    # Postprocess recorded event
    strip_leading_underscore_(record) if @strip_leading_underscore

    emit(time, record)
  }
rescue => e
  log.error data.dump, error: e.to_s
  log.error_backtrace
end

#shutdownObject



51
52
53
# File 'lib/fluent/plugin/in_gelf.rb', line 51

def shutdown
  super
end

#startObject



45
46
47
48
49
# File 'lib/fluent/plugin/in_gelf.rb', line 45

def start
  super

  listen
end