Class: Fluent::Plugin::LogzioOutputBuffered
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::LogzioOutputBuffered
show all
- Defined in:
- lib/fluent/plugin/out_logzio_buffered.rb
Defined Under Namespace
Classes: RetryableResponse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of LogzioOutputBuffered.
53
54
55
56
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 53
def initialize
super
@registry = ::Prometheus::Client.registry
end
|
Instance Method Details
#compress(string) ⇒ Object
208
209
210
211
212
213
214
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 208
def compress(string)
wio = StringIO.new("w")
w_gz = Zlib::GzipWriter.new(wio)
w_gz.write(string)
w_gz.close
wio.string
end
|
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 26
def configure(conf)
super
compat_parameters_convert(conf, :buffer)
log.debug "Logz.io URL #{@endpoint_url}"
if conf['proxy_uri']
log.debug "Proxy #{@proxy_uri}"
ENV['http_proxy'] = @proxy_uri
end
if conf['proxy_cert']
log.debug "Proxy #{@proxy_cert}"
ENV['SSL_CERT_FILE'] = @proxy_cert
end
@metric_labels = {
type: 'logzio_buffered',
plugin_id: 'out_logzio',
}
@metrics = {
status_codes: get_gauge(
:logzio_status_codes,
'Status codes received from Logz.io', {"status_code":""}),
}
end
|
#do_post(bulk_records, bulk_size) ⇒ Object
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 157
def do_post(bulk_records, bulk_size)
log.debug "Sending a bulk of #{bulk_records.size} records, size #{bulk_size}B to Logz.io"
post = Net::HTTP::Post.new @uri.request_uri
post.body = bulk_records.join("\n")
if gzip
post.body = compress(post.body)
end
begin
response = @http.request @uri, post
rescue Net::HTTP::Persistent::Error => e
raise e.cause
return response
end
end
|
#encode_chunk(chunk) ⇒ Object
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 100
def encode_chunk(chunk)
records = []
bulk_size = 0
chunk.each { |tag, time, record|
record['@timestamp'] ||= Time.at(time).iso8601(3) if @output_include_time
record[@output_tags_fieldname] ||= tag.to_s if @output_include_tags
begin
json_record = Yajl.dump(record)
record_size = json_record.size + (1 if !records.empty?).to_i rescue
log.error "Adding record #{record} to buffer failed. Exception: #{$!}"
next
end
if record_size > @bulk_limit
if @bulk_limit_warning_limit.is_a?(Integer)
log.warn "Record with size #{record_size} exceeds #{@bulk_limit} and can't be sent to Logz.io. Record starts with (truncated at #{@bulk_limit_warning_limit} characters): #{json_record[0,@bulk_limit_warning_limit]}"
log.debug "Record with size #{record_size} exceeds #{@bulk_limit} and can't be sent to Logz.io. Record is: #{json_record}"
else
log.warn "Record with size #{record_size} exceeds #{@bulk_limit} and can't be sent to Logz.io. Record is: #{json_record}"
end
next
end
if bulk_size + record_size > @bulk_limit
yield(records, bulk_size)
records = []
bulk_size = 0
end
records.push(json_record)
bulk_size += record_size
}
if records
yield(records, bulk_size)
end
end
|
85
86
87
88
89
90
91
92
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 85
def format(tag, time, record)
if time.is_a?(Fluent::EventTime)
sec_frac = time.to_f
else
sec_frac = time * 1.0
end
[tag, sec_frac, record].to_msgpack
end
|
77
78
79
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 77
def formatted_to_msgpack_binary?
true
end
|
#get_gauge(name, docstring, extra_labels = {}) ⇒ Object
220
221
222
223
224
225
226
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 220
def get_gauge(name, docstring, = {})
if @registry.exist?(name)
@registry.get(name)
else
@registry.gauge(name, docstring: docstring, labels: @metric_labels.keys + .keys)
end
end
|
#merge_labels(extra_labels = {}) ⇒ Object
216
217
218
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 216
def merge_labels(= {})
@metric_labels.merge
end
|
#multi_workers_ready? ⇒ Boolean
81
82
83
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 81
def multi_workers_ready?
true
end
|
#process_code_400(bulk_records, response_body) ⇒ Object
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 177
def process_code_400(bulk_records, response_body)
max_log_field_size_bytes = 32000
oversized_logs_counter = response_body['oversizedLines'].to_i
new_bulk = []
for log_record in bulk_records
log.info "Oversized lines: #{oversized_logs_counter}" if oversized_logs_counter == 0
log.debug "No malformed lines, breaking"
break
end
new_log = Yajl.load(log_record)
msg_size = new_log['message'].size
if msg_size >= max_log_field_size_bytes
new_log['message'] = new_log['message'][0, max_log_field_size_bytes - 1]
log.debug "new log: #{new_log}"
new_bulk.append(Yajl.dump(new_log))
oversized_logs_counter -= 1
end
end
if new_bulk.size > 0
log.debug "Number of fixed bad logs to send: #{new_bulk.size}"
response = do_post(new_bulk, new_bulk.size)
if response.code.start_with?('2')
log.info "Succesfully sent bad logs"
else
log.warn "While trying to send fixed bad logs, got #{response.code} from Logz.io, will not try to re-send"
end
end
end
|
#send_bulk(bulk_records, bulk_size) ⇒ Object
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 138
def send_bulk(bulk_records, bulk_size)
response = do_post(bulk_records, bulk_size)
@metrics[:status_codes].increment(labels: merge_labels({'status_code': response.code.to_s}))
if not response.code.start_with?('2')
if response.code == '400'
log.warn "Received #{response.code} from Logzio. Some logs may be malformed or too long. Valid logs were succesfully sent into the system. Will try to proccess and send oversized logs. Response body: #{response.body}"
process_code_400(bulk_records, Yajl.load(response.body))
elsif response.code == '401'
log.error "Received #{response.code} from Logzio. Unauthorized, please check your logs shipping token. Will not retry sending. Response body: #{response.body}"
else
log.debug "Failed request body: #{post.body}"
log.error "Error while sending POST to #{@uri}: #{response.body}"
raise RetryableResponse, "Logzio listener returned (#{response.code}) for #{@uri}: #{response.body}", []
end
end
end
|
#shutdown ⇒ Object
73
74
75
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 73
def shutdown
super
end
|
#start ⇒ Object
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 58
def start
super
require 'net/http/persistent'
@uri = URI @endpoint_url
@http = Net::HTTP::Persistent.new name: 'fluent-plugin-logzio', proxy: :ENV
@http.['Content-Type'] = 'text/plain'
if @gzip
@http.['Content-Encoding'] = 'gzip'
end
@http.idle_timeout = @http_idle_timeout
@http.socket_options << [Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1]
log.debug "Started Logz.io shipper.."
end
|
#write(chunk) ⇒ Object
94
95
96
97
98
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 94
def write(chunk)
encode_chunk(chunk) { |bulk_records, bulk_size|
send_bulk(bulk_records, bulk_size)
}
end
|