Class: Fluent::Plugin::Sumologic

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_sumologic.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"
LOGS_DATA_TYPE =
"logs"
METRICS_DATA_TYPE =
"metrics"
DEFAULT_DATA_TYPE =
LOGS_DATA_TYPE
DEFAULT_METRIC_FORMAT_TYPE =
'graphite'

Instance Method Summary collapse

Constructor Details

#initializeSumologic



187
188
189
# File 'lib/fluent/plugin/out_sumologic.rb', line 187

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting.



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/fluent/plugin/out_sumologic.rb', line 196

def configure(conf)

  compat_parameters_convert(conf, :buffer)

  unless conf['endpoint'] =~ URI::regexp
    raise Fluent::ConfigError, "Invalid SumoLogic endpoint url: #{conf['endpoint']}"
  end

  unless conf['data_type'].nil?
    unless conf['data_type'] =~ /\A(?:logs|metrics)\z/
      raise Fluent::ConfigError, "Invalid data_type #{conf['data_type']} must be logs or metrics"
    end
  end

  if conf['data_type'].nil? || conf['data_type'] == LOGS_DATA_TYPE
    unless conf['log_format'].nil?
      unless conf['log_format'] =~ /\A(?:json|text|json_merge|fields)\z/
        raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json, json_merge or fields"
      end
    end
  end

  if conf['data_type'] == METRICS_DATA_TYPE && ! conf['metrics_data_type'].nil?
    unless conf['metrics_data_type'] =~ /\A(?:graphite|carbon2|pronetheus)\z/
      raise Fluent::ConfigError, "Invalid metrics_data_type #{conf['metrics_data_type']} must be graphite or carbon2 or prometheus"
    end
  end

  conf['custom_fields'] = validate_key_value_pairs(conf['custom_fields'])
  if conf['custom_fields'].nil?
    conf.delete 'custom_fields'
  end
  unless conf['custom_fields']
    @log.debug "Custom fields: #{conf['custom_fields']}"
  end

  conf['custom_dimensions'] = validate_key_value_pairs(conf['custom_dimensions'])
  if conf['custom_dimensions'].nil?
    conf.delete 'custom_dimensions'
  end
  unless conf['custom_dimensions']
    @log.debug "Custom dimensions: #{conf['custom_dimensions']}"
  end

  # For some reason default is set incorrectly in unit-tests
  if conf['sumo_client'].nil? || conf['sumo_client'].strip.length == 0
    conf['sumo_client'] = 'fluentd-output'
  end

  @sumo_conn = SumologicConnection.new(
    conf['endpoint'],
    conf['verify_ssl'],
    conf['open_timeout'].to_i,
    conf['send_timeout'].to_i,
    conf['proxy_uri'],
    conf['disable_cookies'],
    conf['sumo_client'],
    conf['compress'],
    conf['compress_encoding'],
    log,
    )

  if !conf['max_request_size'].nil? && conf['max_request_size'].to_i <= 0
    conf['max_request_size'] = '0'
  end
  super
end

#dump_log(log) ⇒ Object

Strip sumo_metadata and dump to json



291
292
293
294
295
296
297
298
299
300
# File 'lib/fluent/plugin/out_sumologic.rb', line 291

def dump_log(log)
  log.delete('_sumo_metadata')
  begin
    hash = JSON.parse(log[@log_key])
    log[@log_key] = hash
    Yajl.dump(log)
  rescue
    Yajl.dump(log)
  end
end

#format(tag, time, record) ⇒ Object



302
303
304
305
306
307
308
309
# File 'lib/fluent/plugin/out_sumologic.rb', line 302

def format(tag, time, record)
  if defined? time.nsec
    mstime = time * 1000 + (time.nsec / 1000000)
    [mstime, record].to_msgpack
  else
    [time, record].to_msgpack
  end
end

#formatted_to_msgpack_binaryObject



311
312
313
# File 'lib/fluent/plugin/out_sumologic.rb', line 311

def formatted_to_msgpack_binary
  true
end

#log_to_str(log) ⇒ Object

Convert log to string and strip it



338
339
340
341
342
343
344
345
346
347
348
# File 'lib/fluent/plugin/out_sumologic.rb', line 338

def log_to_str(log)
  if log.is_a?(Array) or log.is_a?(Hash)
    log = Yajl.dump(log)
  end

  unless log.nil?
    log.strip!
  end

  return log
end

#merge_json(record) ⇒ Object

Used to merge log record into top level json



275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/fluent/plugin/out_sumologic.rb', line 275

def merge_json(record)
  if record.has_key?(@log_key)
    log = record[@log_key].strip
    if log[0].eql?('{') && log[-1].eql?('}')
      begin
        record = record.merge(JSON.parse(log))
        record.delete(@log_key)
      rescue JSON::ParserError
        # do nothing, ignore
      end
    end
  end
  record
end

#multi_workers_ready?Boolean



191
192
193
# File 'lib/fluent/plugin/out_sumologic.rb', line 191

def multi_workers_ready?
  true
end

#shutdownObject

This method is called when shutting down.



270
271
272
# File 'lib/fluent/plugin/out_sumologic.rb', line 270

def shutdown
  super
end

#startObject

This method is called when starting.



265
266
267
# File 'lib/fluent/plugin/out_sumologic.rb', line 265

def start
  super
end

#sumo_key(sumo_metadata, chunk) ⇒ Object



315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/fluent/plugin/out_sumologic.rb', line 315

def sumo_key(, chunk)
  source_name = ['source'] || @source_name
  source_name = extract_placeholders(source_name, chunk) unless source_name.nil?

  source_category = ['category'] || @source_category
  source_category = extract_placeholders(source_category, chunk) unless source_category.nil?

  source_host = ['host'] || @source_host
  source_host = extract_placeholders(source_host, chunk) unless source_host.nil?

  fields = ['fields'] || ""
  fields = extract_placeholders(fields, chunk) unless fields.nil?

  { :source_name => "#{source_name}", :source_category => "#{source_category}",
    :source_host => "#{source_host}", :fields => "#{fields}" }
end

#sumo_timestamp(time) ⇒ Object

Convert timestamp to 13 digit epoch if necessary



333
334
335
# File 'lib/fluent/plugin/out_sumologic.rb', line 333

def sumo_timestamp(time)
  time.to_s.length == 13 ? time : time * 1000
end

#validate_key_value_pairs(fields) ⇒ Object



490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
# File 'lib/fluent/plugin/out_sumologic.rb', line 490

def validate_key_value_pairs(fields)
  if fields.nil?
    return fields
  end

  fields = fields.split(",").select { |field|
    field.split('=').length == 2
  }

  if fields.length == 0
    return nil
  end

  fields.join(',')
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk



351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
# File 'lib/fluent/plugin/out_sumologic.rb', line 351

def write(chunk)
  messages_list = {}

  # Sort messages
  chunk.msgpack_each do |time, record|
    # plugin dies randomly
    # https://github.com/uken/fluent-plugin-elasticsearch/commit/8597b5d1faf34dd1f1523bfec45852d380b26601#diff-ae62a005780cc730c558e3e4f47cc544R94
    next unless record.is_a? Hash
     = record.fetch('_sumo_metadata', {:source => record[@source_name_key] })
    key           = sumo_key(, chunk)
    log_format    = ['log_format'] || @log_format

    # Strip any unwanted newlines
    record[@log_key].chomp! if record[@log_key] && record[@log_key].respond_to?(:chomp!)

    case @data_type
    when 'logs'
      case log_format
      when 'text'
        log = log_to_str(record[@log_key])
      when 'json_merge'
        if @add_timestamp
          record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
        end
        log = dump_log(merge_json(record))
      when 'fields'
        if @add_timestamp
          record = {  @timestamp_key => sumo_timestamp(time) }.merge(record)
        end
        log = dump_log(record)
      else
        if @add_timestamp
          record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
        end
        log = dump_log(record)
      end
    when 'metrics'
      log = log_to_str(record[@log_key])
    end

    unless log.nil?
      if messages_list.key?(key)
        messages_list[key].push(log)
      else
        messages_list[key] = [log]
      end
    end

  end

  chunk_id = "##{chunk.dump_unique_id_hex(chunk.unique_id)}"
  # Push logs to sumo
  messages_list.each do |key, messages|
    source_name, source_category, source_host, fields = key[:source_name], key[:source_category],
      key[:source_host], key[:fields]

    # Merge custom and record fields
    if fields.nil? || fields.strip.length == 0
      fields = @custom_fields
    else
      fields = [fields,@custom_fields].compact.join(",")
    end

    if @max_request_size <= 0
      messages_to_send = [messages]
    else
      messages_to_send = []
      current_message = []
      current_length = 0
      messages.each do |message|
        current_message.push message
        current_length += message.length

        if current_length > @max_request_size
          messages_to_send.push(current_message)
          current_message = []
          current_length = 0
        end
        current_length += 1  # this is for newline
      end
      if current_message.length > 0
        messages_to_send.push(current_message)
      end
    end
    
    messages_to_send.each_with_index do |message, i|
      retries = 0
      start_time = Time.now
      sleep_time = @retry_min_interval

      while true
        common_log_part = "#{@data_type} records with source category '#{source_category}', source host '#{source_host}', source name '#{source_name}', chunk #{chunk_id}, try #{retries}, batch #{i}"

        begin
          @log.debug { "Sending #{message.count}; #{common_log_part}" }

          @sumo_conn.publish(
            message.join("\n"),
              source_host         =source_host,
              source_category     =source_category,
              source_name         =source_name,
              data_type           =@data_type,
              metric_data_format  =@metric_data_format,
              collected_fields    =fields,
              dimensions          =@custom_dimensions
          )
          break
        rescue => e
          if !@use_internal_retry
            raise e
          end
          # increment retries
          retries += 1

          log.warn "error while sending request to sumo: #{e}; #{common_log_part}"
          log.warn_backtrace e.backtrace

          # drop data if
          #   - we reached out the @retry_max_times retries
          #   - or we exceeded @retry_timeout
          if (retries >= @retry_max_times && @retry_max_times > 0) || (Time.now > start_time + @retry_timeout && @retry_timeout > 0)
            log.warn "dropping records; #{common_log_part}"
            break
          end

          log.info "going to retry to send data at #{Time.now + sleep_time}; #{common_log_part}"
          sleep sleep_time

          sleep_time *= 2
          if sleep_time > @retry_max_interval
            sleep_time = @retry_max_interval
          end
        end
      end
    end
  end

end