Class: Fluent::Plugin::GelfOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::GelfOutput
show all
- Includes:
- GelfPluginUtil
- Defined in:
- lib/fluent/plugin/out_gelf.rb
Constant Summary
collapse
- DEFAULT_BUFFER_TYPE =
'memory'.freeze
- DEFAULT_TIMEKEY =
5
- DEFAULT_TIMEKEY_WAIT =
0
- MAX_PAYLOAD_SIZE =
1000000
GelfPluginUtil::LEVEL_MAP
Instance Method Summary
collapse
#make_gelfentry
Instance Method Details
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/fluent/plugin/out_gelf.rb', line 43
def configure(conf)
super(conf)
raise Fluent::ConfigError.new("'host' parameter (hostname or address of Graylog2 server) is required") unless conf.has_key?('host')
if @protocol == 'udp'
unless %w[WAN LAN].include?(@udp_transport_type)
raise Fluent::ConfigError.new("'udp_transport_type' parameter should be either 'WAN' or 'LAN'")
end
end
if @protocol == 'udp'
@proto = GELF::Protocol::UDP
elsif @protocol == 'tcp'
@proto = GELF::Protocol::TCP
else
raise Fluent::ConfigError.new("'protocol' parameter should be either 'udp' (default) or 'tcp'")
end
end
|
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
# File 'lib/fluent/plugin/out_gelf.rb', line 89
def format(tag, time, record)
if defined? Fluent::EventTime and time.is_a? Fluent::EventTime then
timestamp = time.to_i + (time.nsec.to_f/1000000000).round(3)
else
timestamp = time.to_i
end
begin
make_gelfentry(
tag,timestamp,record,
{
:use_record_host => @use_record_host,
:add_msec_time => @add_msec_time,
:max_bytes => @max_bytes
}
).to_msgpack
rescue Exception => e
log.error sprintf(
'Error trying to serialize %s: %s',
record.to_s.force_encoding('UTF-8'),
e.message.to_s.force_encoding('UTF-8')
)
end
end
|
39
40
41
|
# File 'lib/fluent/plugin/out_gelf.rb', line 39
def formatted_to_msgpack_binary
true
end
|
#multi_workers_ready? ⇒ Boolean
35
36
37
|
# File 'lib/fluent/plugin/out_gelf.rb', line 35
def multi_workers_ready?
true
end
|
#shutdown ⇒ Object
85
86
87
|
# File 'lib/fluent/plugin/out_gelf.rb', line 85
def shutdown
super
end
|
#start ⇒ Object
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/fluent/plugin/out_gelf.rb', line 66
def start
super
options = {:facility => 'fluentd', :protocol => @proto}
if @tls then
options[:tls] = @tls_options
end
@conn = GELF::Notifier.new(@host, @port, @udp_transport_type, options)
@conn.level_mapping = 'direct'
@conn.collect_file_and_line = false
end
|
#write(chunk) ⇒ Object
114
115
116
117
118
119
120
121
122
123
|
# File 'lib/fluent/plugin/out_gelf.rb', line 114
def write(chunk)
chunk.msgpack_each do |data|
begin
@conn.notify!(data)
rescue Exception => e
log.warn "failed to flush the buffer.", error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id
raise e
end
end
end
|