Class: Fluent::Plugin::LogzioOutputBuffered
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::LogzioOutputBuffered
show all
- Defined in:
- lib/fluent/plugin/out_logzio_buffered.rb
Defined Under Namespace
Classes: RetryableResponse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of LogzioOutputBuffered.
54
55
56
57
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 54
def initialize
super
@registry = ::Prometheus::Client.registry
end
|
Instance Method Details
#compress(string) ⇒ Object
209
210
211
212
213
214
215
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 209
def compress(string)
wio = StringIO.new("w")
w_gz = Zlib::GzipWriter.new(wio)
w_gz.write(string)
w_gz.close
wio.string
end
|
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 27
def configure(conf)
super
compat_parameters_convert(conf, :buffer)
log.debug "Logz.io URL #{@endpoint_url}"
if conf['proxy_uri']
log.debug "Proxy #{@proxy_uri}"
ENV['http_proxy'] = @proxy_uri
end
if conf['proxy_cert']
log.debug "Proxy #{@proxy_cert}"
ENV['SSL_CERT_FILE'] = @proxy_cert
end
@metric_labels = {
type: 'logzio_buffered',
plugin_id: 'out_logzio',
}
@metrics = {
status_codes: get_gauge(
:logzio_status_codes,
'Status codes received from Logz.io', {"status_code":""}),
}
end
|
#do_post(bulk_records, bulk_size) ⇒ Object
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 158
def do_post(bulk_records, bulk_size)
log.debug "Sending a bulk of #{bulk_records.size} records, size #{bulk_size}B to Logz.io"
post = Net::HTTP::Post.new @uri.request_uri
post.body = bulk_records.join("\n")
if gzip
post.body = compress(post.body)
end
begin
response = @http.request @uri, post
rescue Net::HTTP::Persistent::Error => e
raise e.cause
return response
end
end
|
#encode_chunk(chunk) ⇒ Object
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 101
def encode_chunk(chunk)
records = []
bulk_size = 0
chunk.each { |tag, time, record|
record['@timestamp'] ||= Time.at(time).iso8601(3) if @output_include_time
record[@output_tags_fieldname] ||= tag.to_s if @output_include_tags
begin
json_record = Yajl.dump(record)
record_size = json_record.size + (1 if !records.empty?).to_i rescue
log.error "Adding record #{record} to buffer failed. Exception: #{$!}"
next
end
if record_size > @bulk_limit
if @bulk_limit_warning_limit.is_a?(Integer)
log.warn "Record with size #{record_size} exceeds #{@bulk_limit} and can't be sent to Logz.io. Record starts with (truncated at #{@bulk_limit_warning_limit} characters): #{json_record[0,@bulk_limit_warning_limit]}"
log.debug "Record with size #{record_size} exceeds #{@bulk_limit} and can't be sent to Logz.io. Record is: #{json_record}"
else
log.warn "Record with size #{record_size} exceeds #{@bulk_limit} and can't be sent to Logz.io. Record is: #{json_record}"
end
next
end
if bulk_size + record_size > @bulk_limit
yield(records, bulk_size)
records = []
bulk_size = 0
end
records.push(json_record)
bulk_size += record_size
}
if records
yield(records, bulk_size)
end
end
|
86
87
88
89
90
91
92
93
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 86
def format(tag, time, record)
if time.is_a?(Fluent::EventTime)
sec_frac = time.to_f
else
sec_frac = time * 1.0
end
[tag, sec_frac, record].to_msgpack
end
|
78
79
80
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 78
def formatted_to_msgpack_binary?
true
end
|
#get_gauge(name, docstring, extra_labels = {}) ⇒ Object
221
222
223
224
225
226
227
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 221
def get_gauge(name, docstring, = {})
if @registry.exist?(name)
@registry.get(name)
else
@registry.gauge(name, docstring: docstring, labels: @metric_labels.keys + .keys)
end
end
|
#merge_labels(extra_labels = {}) ⇒ Object
217
218
219
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 217
def merge_labels(= {})
@metric_labels.merge
end
|
#multi_workers_ready? ⇒ Boolean
82
83
84
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 82
def multi_workers_ready?
true
end
|
#process_code_400(bulk_records, response_body) ⇒ Object
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 178
def process_code_400(bulk_records, response_body)
max_log_field_size_bytes = 32000
oversized_logs_counter = response_body['oversizedLines'].to_i
new_bulk = []
for log_record in bulk_records
log.info "Oversized lines: #{oversized_logs_counter}" if oversized_logs_counter == 0
log.debug "No malformed lines, breaking"
break
end
new_log = Yajl.load(log_record)
msg_size = new_log['message'].size
if msg_size >= max_log_field_size_bytes
new_log['message'] = new_log['message'][0, max_log_field_size_bytes - 1]
log.debug "new log: #{new_log}"
new_bulk.append(Yajl.dump(new_log))
oversized_logs_counter -= 1
end
end
if new_bulk.size > 0
log.debug "Number of fixed bad logs to send: #{new_bulk.size}"
response = do_post(new_bulk, new_bulk.size)
if response.code.start_with?('2')
log.info "Succesfully sent bad logs"
else
log.warn "While trying to send fixed bad logs, got #{response.code} from Logz.io, will not try to re-send"
end
end
end
|
#send_bulk(bulk_records, bulk_size) ⇒ Object
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 139
def send_bulk(bulk_records, bulk_size)
response = do_post(bulk_records, bulk_size)
@metrics[:status_codes].increment(labels: merge_labels({'status_code': response.code.to_s}))
if not response.code.start_with?('2')
if response.code == '400'
log.warn "Received #{response.code} from Logzio. Some logs may be malformed or too long. Valid logs were succesfully sent into the system. Will try to proccess and send oversized logs. Response body: #{response.body}"
process_code_400(bulk_records, Yajl.load(response.body))
elsif response.code == '401'
log.error "Received #{response.code} from Logzio. Unauthorized, please check your logs shipping token. Will not retry sending. Response body: #{response.body}"
else
log.debug "Failed request body: #{post.body}"
log.error "Error while sending POST to #{@uri}: #{response.body}"
raise RetryableResponse, "Logzio listener returned (#{response.code}) for #{@uri}: #{response.body}", []
end
end
end
|
#shutdown ⇒ Object
74
75
76
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 74
def shutdown
super
end
|
#start ⇒ Object
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 59
def start
super
require 'net/http/persistent'
@uri = URI @endpoint_url
@http = Net::HTTP::Persistent.new name: 'fluent-plugin-logzio', proxy: :ENV
@http.['Content-Type'] = 'text/plain'
if @gzip
@http.['Content-Encoding'] = 'gzip'
end
@http.idle_timeout = @http_idle_timeout
@http.socket_options << [Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1]
log.debug "Started Logz.io shipper.."
end
|
#write(chunk) ⇒ Object
95
96
97
98
99
|
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 95
def write(chunk)
encode_chunk(chunk) { |bulk_records, bulk_size|
send_bulk(bulk_records, bulk_size)
}
end
|