Class: Fluent::Plugin::GelfOutput

Inherits:
Output
  • Object
show all
Includes:
GelfPluginUtil
Defined in:
lib/fluent/plugin/out_gelf.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
'memory'.freeze
DEFAULT_TIMEKEY =
5
DEFAULT_TIMEKEY_WAIT =
0
MAX_PAYLOAD_SIZE =

bytes

1000000

Constants included from GelfPluginUtil

GelfPluginUtil::LEVEL_MAP

Instance Method Summary collapse

Methods included from GelfPluginUtil

#make_gelfentry

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/out_gelf.rb', line 43

def configure(conf)
  super(conf)

  # a destination hostname or IP address must be provided
  raise Fluent::ConfigError.new("'host' parameter (hostname or address of Graylog2 server) is required") unless conf.has_key?('host')

  # validate udp_transport_type if protocol is udp
  if @protocol == 'udp'
    unless %w[WAN LAN].include?(@udp_transport_type)
      raise Fluent::ConfigError.new("'udp_transport_type' parameter should be either 'WAN' or 'LAN'")
    end
  end

  # choose protocol to pass to gelf-rb Notifier constructor
  if @protocol == 'udp'
    @proto = GELF::Protocol::UDP
  elsif @protocol == 'tcp'
    @proto = GELF::Protocol::TCP
  else
    raise Fluent::ConfigError.new("'protocol' parameter should be either 'udp' (default) or 'tcp'")
  end
end

#format(tag, time, record) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/fluent/plugin/out_gelf.rb', line 89

def format(tag, time, record)
  if defined? Fluent::EventTime and time.is_a? Fluent::EventTime then
    timestamp = time.to_i + (time.nsec.to_f/1000000000).round(3)
  else
    timestamp = time.to_i
  end

  begin
    make_gelfentry(
      tag,timestamp,record,
      {
        :use_record_host => @use_record_host,
        :add_msec_time => @add_msec_time,
        :max_bytes => @max_bytes
      }
    ).to_msgpack
  rescue Exception => e
    log.error sprintf(
      'Error trying to serialize %s: %s',
      record.to_s.force_encoding('UTF-8'),
      e.message.to_s.force_encoding('UTF-8')
    )
  end
end

#formatted_to_msgpack_binaryObject



39
40
41
# File 'lib/fluent/plugin/out_gelf.rb', line 39

def formatted_to_msgpack_binary
  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/fluent/plugin/out_gelf.rb', line 35

def multi_workers_ready?
  true
end

#shutdownObject



85
86
87
# File 'lib/fluent/plugin/out_gelf.rb', line 85

def shutdown
  super
end

#startObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/out_gelf.rb', line 66

def start
  super

  options = {:facility => 'fluentd', :protocol => @proto}

  # add tls key (tls_options) only when tls = True
  # see https://github.com/graylog-labs/gelf-rb/blob/72916932b789f7a6768c3cdd6ab69a3c942dbcef/lib/gelf/notifier.rb#L133-L140
  if @tls then
    options[:tls] = @tls_options
  end

  @conn = GELF::Notifier.new(@host, @port, @udp_transport_type, options)

  # Errors are not coming from Ruby so we use direct mapping
  @conn.level_mapping = 'direct'
  # file and line from Ruby are in this class, not relevant
  @conn.collect_file_and_line = false
end

#write(chunk) ⇒ Object



114
115
116
117
118
119
120
121
122
123
# File 'lib/fluent/plugin/out_gelf.rb', line 114

def write(chunk)
  chunk.msgpack_each do |data|
    begin
      @conn.notify!(data)
    rescue Exception => e
      log.warn "failed to flush the buffer.", error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id
      raise e
    end
  end
end