Class: Fluent::Plugin::Sumologic

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_sumologic.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"
LOGS_DATA_TYPE =
"logs"
METRICS_DATA_TYPE =
"metrics"
DEFAULT_DATA_TYPE =
LOGS_DATA_TYPE
DEFAULT_METRIC_FORMAT_TYPE =
'graphite'

Instance Method Summary collapse

Constructor Details

#initializeSumologic

Returns a new instance of Sumologic.



95
96
97
# File 'lib/fluent/plugin/out_sumologic.rb', line 95

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/fluent/plugin/out_sumologic.rb', line 104

def configure(conf)

  compat_parameters_convert(conf, :buffer)

  unless conf['endpoint'] =~ URI::regexp
    raise Fluent::ConfigError, "Invalid SumoLogic endpoint url: #{conf['endpoint']}"
  end

  unless conf['data_type'].nil?
    unless conf['data_type'] =~ /\A(?:logs|metrics)\z/
      raise Fluent::ConfigError, "Invalid data_type #{conf['data_type']} must be logs or metrics"
    end
  end

  if conf['data_type'].nil? || conf['data_type'] == LOGS_DATA_TYPE
    unless conf['log_format'].nil?
      unless conf['log_format'] =~ /\A(?:json|text|json_merge|fields)\z/
        raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json, json_merge or fields"
      end
    end
  end

  if conf['data_type'] == METRICS_DATA_TYPE && ! conf['metrics_data_type'].nil?
    unless conf['metrics_data_type'] =~ /\A(?:graphite|carbon2|pronetheus)\z/
      raise Fluent::ConfigError, "Invalid metrics_data_type #{conf['metrics_data_type']} must be graphite or carbon2 or prometheus"
    end
  end

  @sumo_conn = SumologicConnection.new(conf['endpoint'], conf['verify_ssl'], conf['open_timeout'].to_i, conf['proxy_uri'], conf['disable_cookies'])
  super
end

#dump_collected_fields(log_fields) ⇒ Object



216
217
218
219
220
221
222
# File 'lib/fluent/plugin/out_sumologic.rb', line 216

def dump_collected_fields(log_fields)
  if log_fields.nil?
    log_fields
  else
    log_fields.map{|k,v| "#{k}=#{v}"}.join(',')
  end
end

#dump_log(log) ⇒ Object

Strip sumo_metadata and dump to json



163
164
165
166
167
168
169
170
171
172
173
# File 'lib/fluent/plugin/out_sumologic.rb', line 163

def dump_log(log)
  log.delete('_sumo_metadata')
  begin
    parser = Yajl::Parser.new
    hash = parser.parse(log[@log_key])
    log[@log_key] = hash
    Yajl.dump(log)
  rescue
    Yajl.dump(log)
  end
end

#format(tag, time, record) ⇒ Object



175
176
177
178
179
180
181
182
# File 'lib/fluent/plugin/out_sumologic.rb', line 175

def format(tag, time, record)
  if defined? time.nsec
    mstime = time * 1000 + (time.nsec / 1000000)
    [mstime, record].to_msgpack
  else
    [time, record].to_msgpack
  end
end

#formatted_to_msgpack_binaryObject



184
185
186
# File 'lib/fluent/plugin/out_sumologic.rb', line 184

def formatted_to_msgpack_binary
  true
end

#merge_json(record) ⇒ Object

Used to merge log record into top level json



147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/fluent/plugin/out_sumologic.rb', line 147

def merge_json(record)
  if record.has_key?(@log_key)
    log = record[@log_key].strip
    if log[0].eql?('{') && log[-1].eql?('}')
      begin
        record = record.merge(JSON.parse(log))
        record.delete(@log_key)
      rescue JSON::ParserError
        # do nothing, ignore
      end
    end
  end
  record
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/fluent/plugin/out_sumologic.rb', line 99

def multi_workers_ready?
  true
end

#shutdownObject

This method is called when shutting down.



142
143
144
# File 'lib/fluent/plugin/out_sumologic.rb', line 142

def shutdown
  super
end

#startObject

This method is called when starting.



137
138
139
# File 'lib/fluent/plugin/out_sumologic.rb', line 137

def start
  super
end

#sumo_fields(sumo_metadata) ⇒ Object



206
207
208
209
210
211
212
213
214
# File 'lib/fluent/plugin/out_sumologic.rb', line 206

def sumo_fields()
  fields = ['fields'] || ""
  Hash[
      fields.split(',').map do |pair|
        k, v = pair.split('=', 2)
        [k, v]
      end
  ]
end

#sumo_key(sumo_metadata, chunk) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/fluent/plugin/out_sumologic.rb', line 188

def sumo_key(, chunk)
  source_name = ['source'] || @source_name
  source_name = extract_placeholders(source_name, chunk) unless source_name.nil?

  source_category = ['category'] || @source_category
  source_category = extract_placeholders(source_category, chunk) unless source_category.nil?

  source_host = ['host'] || @source_host
  source_host = extract_placeholders(source_host, chunk) unless source_host.nil?

  "#{source_name}:#{source_category}:#{source_host}"
end

#sumo_timestamp(time) ⇒ Object

Convert timestamp to 13 digit epoch if necessary



202
203
204
# File 'lib/fluent/plugin/out_sumologic.rb', line 202

def sumo_timestamp(time)
  time.to_s.length == 13 ? time : time * 1000
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/fluent/plugin/out_sumologic.rb', line 225

def write(chunk)
  messages_list = {}
  log_fields = nil

  # Sort messages
  chunk.msgpack_each do |time, record|
    # plugin dies randomly
    # https://github.com/uken/fluent-plugin-elasticsearch/commit/8597b5d1faf34dd1f1523bfec45852d380b26601#diff-ae62a005780cc730c558e3e4f47cc544R94
    next unless record.is_a? Hash
     = record.fetch('_sumo_metadata', {:source => record[@source_name_key] })
    key           = sumo_key(, chunk)
    log_format    = ['log_format'] || @log_format

    # Strip any unwanted newlines
    record[@log_key].chomp! if record[@log_key] && record[@log_key].respond_to?(:chomp!)

    case @data_type
    when 'logs'
      case log_format
      when 'text'
        log = record[@log_key]
        unless log.nil?
          log.strip!
        end
      when 'json_merge'
        if @add_timestamp
          record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
        end
        log = dump_log(merge_json(record))
      when 'fields'
        log_fields = sumo_fields()
        if @add_timestamp
          record = {  @timestamp_key => sumo_timestamp(time) }.merge(record)
        end
        log = dump_log(record)
      else
        if @add_timestamp
          record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
        end
        log = dump_log(record)
      end
    when 'metrics'
      log = record[@log_key]
      unless log.nil?
        log.strip!
      end
    end

    unless log.nil?
      if messages_list.key?(key)
        messages_list[key].push(log)
      else
        messages_list[key] = [log]
      end
    end

  end

  # Push logs to sumo
  messages_list.each do |key, messages|
    source_name, source_category, source_host = key.split(':')
    @sumo_conn.publish(
        messages.join("\n"),
        source_host         =source_host,
        source_category     =source_category,
        source_name         =source_name,
        data_type           =@data_type,
        metric_data_format  =@metric_data_format,
        collected_fields    =dump_collected_fields(log_fields)
    )
  end

end