Class: Fluent::GelfInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_gelf.rb

Instance Method Summary collapse

Constructor Details

#initializeGelfInput

Returns a new instance of GelfInput.



13
14
15
16
17
# File 'lib/fluent/plugin/in_gelf.rb', line 13

def initialize
  super
  require 'fluent/plugin/socket_util'
  require 'gelfd'
end

Instance Method Details

#configure(conf) ⇒ Object



40
41
42
43
44
45
# File 'lib/fluent/plugin/in_gelf.rb', line 40

def configure(conf)
  super

  @parser = Plugin.new_parser(@format)
  @parser.configure(conf)
end

#emit(time, record) ⇒ Object



107
108
109
110
111
# File 'lib/fluent/plugin/in_gelf.rb', line 107

def emit(time, record)
  router.emit(@tag, time, record)
rescue => e
  log.error 'gelf failed to emit', error: e.to_s, error_class: e.class.to_s, tag: @tag, record: Yajl.dump(record)
end

#listen(callback) ⇒ Object



96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/in_gelf.rb', line 96

def listen(callback)
  log.info "listening gelf socket on #{@bind}:#{@port} with #{@protocol_type}"
  if @protocol_type == :tcp
    Coolio::TCPServer.new(@bind, @port, SocketUtil::TcpHandler, log, "\n", callback)
  else
    @usock = SocketUtil.create_udp_socket(@bind)
    @usock.bind(@bind, @port)
    SocketUtil::UdpHandler.new(@usock, log, 8192, callback)
  end
end

#receive_data(data, addr) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/in_gelf.rb', line 69

def receive_data(data, addr)
  begin
    msg = Gelfd::Parser.parse(data)
  rescue => e
    log.warn 'Gelfd failed to parse a message', error: e.to_s
    log.warn_backtrace
  end

  # Gelfd parser will return nil if it received and parsed a non-final chunk
  return if msg.nil?

  @parser.parse(msg) { |time, record|
    unless time && record
      log.warn "pattern not match: #{msg.inspect}"
      return
    end

    # Use the recorded event time if available
    time = record.delete('timestamp').to_i if record.key?('timestamp')

    emit(time, record)
  }
rescue => e
  log.error data.dump, error: e.to_s
  log.error_backtrace
end

#runObject



62
63
64
65
66
67
# File 'lib/fluent/plugin/in_gelf.rb', line 62

def run
  @loop.run(@blocking_timeout)
rescue
  log.error 'unexpected error', error: $!.to_s
  log.error_backtrace
end

#shutdownObject



55
56
57
58
59
60
# File 'lib/fluent/plugin/in_gelf.rb', line 55

def shutdown
  @loop.watchers.each { |w| w.detach }
  @loop.stop
  @handler.close
  @thread.join
end

#startObject



47
48
49
50
51
52
53
# File 'lib/fluent/plugin/in_gelf.rb', line 47

def start
  @loop = Coolio::Loop.new
  @handler = listen(method(:receive_data))
  @loop.attach(@handler)

  @thread = Thread.new(&method(:run))
end