Class: Fluent::Plugin::VmwareLoginsightOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_vmware_loginsight.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 101

def configure(conf)
  super

  @ssl_verify_mode = @ssl_verify ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE
  @auth = case @authentication
          when 'basic'
            :basic
          else
            :none
          end

  @last_request_time = nil
end

#create_loginsight_event(time, record) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 155

def create_loginsight_event(time, record)
  flattened_records = {}
  if @flatten_hashes
    flattened_records = flatten_record(record, [])
  else
    flattened_records = record
  end
  fields = []
  keys = []
  log = ''
  flattened_records.each do |key, value|
    begin
      next if value.nil?
      # LI doesn't support duplicate fields, make unique names by appending underscore
      key = shorten_key(key)
      while keys.include?(key)
        key = key + '_'
      end
      keys.push(key)
      key.force_encoding("utf-8")
      # convert value to json string if its a hash and to string if not already a string
      begin
        value = value.to_json if value.is_a?(Hash)
        value = value.to_s
        value = value.frozen? ? value.dup : value # if value is immutable, use a copy.
        value.force_encoding("utf-8")
      rescue Exception=>e
        $log.warn "force_encoding exception: " "#{e.class}, '#{e.message}', " \
                  "\n Request: #{key} #{record.to_json[1..1024]}"
        value = "Exception during conversion: #{e.message}"
      end
    end
    if @log_text_keys.include?(key)
      if log != "#{value}"
        if log.empty?
          log = "#{value}"
        else
          log += " #{value}"
        end
      end
    else
      # If there is time information available, update time for LI. LI ignores
      # time if it is out of the error/adjusment window of 10 mins. in such
      # cases we would still like to preserve time info, so add it as event.
      # TODO Ignore the below block for now. Handle the case for time being in
      #      different formats than milliseconds
      #if ['time', '_source_realtime_timestamp'].include?(key)
      #  time = value
      #end
      fields << {"name" => key, "content" => value}
    end
  end
  event = {
    "fields" => fields,
    "text" => log.gsub(/^$\n/, ''),
    "timestamp" => (time.to_f * 1000).floor()
  }
  event
end

#flatten_record(record, prefix = []) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 215

def flatten_record(record, prefix=[])
  ret = {}

  case record
    when Hash
      record.each do |key, value|
        if @log_text_keys.include?(key)
          ret.merge!({key.to_s => value})
        else
          ret.merge! flatten_record(value, prefix + [key.to_s])
        end
      end
    when Array
      record.each do |value|
        ret.merge! flatten_record(value, prefix)
      end
    else
      return {prefix.join(@flatten_hashes_separator) => record}
  end
  ret
end

#format_urlObject



115
116
117
118
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 115

def format_url()
  url = "#{@scheme}://#{host}:#{port}/#{path}/#{agent_id}"
  url
end

#get_body(req) ⇒ Object



237
238
239
240
241
242
243
244
245
246
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 237

def get_body(req)
  body = ""
  if @http_compress
    gzip_body = Zlib::GzipReader.new(StringIO.new(req.body.to_s))
    body = gzip_body.read
  else
    body = req.body
  end
  return body[1..1024]
end

#handle_records(chunk) ⇒ Object



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 320

def handle_records(chunk)
  url = format_url()
  uri = URI.parse(url)
  events = []
  count = 0
  chunk.each do |time, record|
    new_event = create_loginsight_event(time, record)
    new_event_size = new_event.to_json.size
    if new_event_size > @max_batch_size
        $log.warn "dropping event larger than max_batch_size: #{new_event.to_json[1..1024]}"
    else
      if (count + new_event_size) > @max_batch_size
        send_events(uri, events)
        events = []
        count = 0
      end
      count += new_event_size
      events << new_event
    end
  end
  if count > 0
    send_events(uri, events)
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


120
121
122
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 120

def multi_workers_ready?
  true
end

#send_events(uri, events) ⇒ Object



310
311
312
313
314
315
316
317
318
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 310

def send_events(uri, events)
  req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.path)
  event_req = {
    "events" => events
  }
  set_body(req, event_req)
  set_header(req)
  send_request(req, uri)
end

#send_request(req, uri) ⇒ Object



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 248

def send_request(req, uri)
  is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
  if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
    $log.info('Dropped request due to rate limiting')
    return
  end
  if @auth and @auth.to_s.eql? "basic"
    req.basic_auth(@username, @password)
  end
  begin
    retries ||= 2
    response = nil
    @last_request_time = Time.now.to_f

    http_conn = Net::HTTP.new(uri.host, uri.port)
    # For debugging, set this
    http_conn.set_debug_output($stdout) if @http_conn_debug
    http_conn.use_ssl = (uri.scheme == 'https')
    if http_conn.use_ssl?
      http_conn.ca_file = @ca_file
    end
    http_conn.verify_mode = @ssl_verify_mode

    response = http_conn.start do |http|
      http.read_timeout = @request_timeout
      http.request(req)
    end
  rescue => e # rescue all StandardErrors
    # server didn't respond
    # Be careful while turning on below log, if LI instance can't be reached and you're sending
    # log-container logs to LI as well, you may end up in a cycle.
    # TODO handle the cyclic case at plugin level if possible.
    # $log.warn "Net::HTTP.#{req.method.capitalize} raises exception: "      #   "#{e.class}, '#{e.message}', \n Request: #{get_body(req)}"
    retry unless (retries -= 1).zero?
    raise e if @raise_on_error
  else
     unless response and response.is_a?(Net::HTTPSuccess)
        res_summary = if response
                         "Response Code: #{response.code}\n"\
                         "Response Message: #{response.message}\n" \
                         "Response Body: #{response.body}"
                      else
                         "Response = nil"
                      end
        # ditto cyclic warning
        # $log.warn "Failed to #{req.method} #{uri}\n(#{res_summary})\n"          #   "Request Size: #{req.body.size} Request Body: #{get_body(req)}"
     end #end unless
  end # end begin
end

#set_body(req, event_req) ⇒ Object

end send_request



300
301
302
303
304
305
306
307
308
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 300

def set_body(req, event_req)
    if @http_compress
        gzip_body = Zlib::GzipWriter.new(StringIO.new)
        gzip_body << event_req.to_json
        req.body = gzip_body.close.string
    else
        req.body = event_req.to_json
    end
end

#set_gzip_header(req) ⇒ Object



139
140
141
142
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 139

def set_gzip_header(req)
  req['Content-Encoding'] = 'gzip'
  req
end

#set_header(req) ⇒ Object



124
125
126
127
128
129
130
131
132
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 124

def set_header(req)
  if @serializer == 'json'
    set_json_header(req)
  end
  if @http_compress
    set_gzip_header(req)
  end
  req
end

#set_json_header(req) ⇒ Object



134
135
136
137
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 134

def set_json_header(req)
  req['Content-Type'] = 'application/json'
  req
end

#shorten_key(key) ⇒ Object



144
145
146
147
148
149
150
151
152
153
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 144

def shorten_key(key)
  # LI doesn't allow some characters in field 'name'
  # like '/', '-', '\', '.', etc. so replace them with @flatten_hashes_separator
  key = key.gsub(/[\/\.\-\\\@]/,@flatten_hashes_separator).downcase
  # shorten field names using provided shorten_keys parameters
  @shorten_keys.each do | match, replace |
      key = key.gsub(match.to_s,replace)
  end
  key
end

#write(chunk) ⇒ Object

Sync Buffered Output



346
347
348
# File 'lib/fluent/plugin/out_vmware_loginsight.rb', line 346

def write(chunk)
  handle_records(chunk)
end