Class: Fluent::Plugin::Sumologic

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_sumologic.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"

Instance Method Summary collapse

Constructor Details

#initializeSumologic

Returns a new instance of Sumologic.



68
69
70
# File 'lib/fluent/plugin/out_sumologic.rb', line 68

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/fluent/plugin/out_sumologic.rb', line 73

def configure(conf)

  compat_parameters_convert(conf, :buffer)

  unless conf['endpoint'] =~ URI::regexp
    raise Fluent::ConfigError, "Invalid SumoLogic endpoint url: #{conf['endpoint']}"
  end

  unless conf['log_format'] =~ /\A(?:json|text|json_merge)\z/
    raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json or json_merge"
  end

  @sumo_conn = SumologicConnection.new(conf['endpoint'], conf['verify_ssl'], conf['open_timeout'].to_i)
  super
end

#dump_log(log) ⇒ Object

Strip sumo_metadata and dump to json



116
117
118
119
# File 'lib/fluent/plugin/out_sumologic.rb', line 116

def dump_log(log)
  log.delete('_sumo_metadata')
  Yajl.dump(log)
end

#expand_param(param, tag, time, record) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/fluent/plugin/out_sumologic.rb', line 153

def expand_param(param, tag, time, record)
  # check for '${ ... }'
  #   yes => `eval`
  #   no  => return param
  return param if (param =~ /\${.+}/).nil?

  # check for 'tag_parts[]'
  # separated by a delimiter (default '.')
  tag_parts = tag.split(@delimiter) unless (param =~ /tag_parts\[.+\]/).nil? || tag.nil?

  # pull out section between ${} then eval
  inner = param.clone
  while inner.match(/\${.+}/)
    to_eval = inner.match(/\${(.+?)}/) { $1 }

    if !(to_eval =~ /record\[.+\]/).nil? && record.nil?
      return to_eval
    elsif !(to_eval =~/tag_parts\[.+\]/).nil? && tag_parts.nil?
      return to_eval
    elsif !(to_eval =~/time/).nil? && time.nil?
      return to_eval
    else
      inner.sub!(/\${.+?}/, eval(to_eval))
    end
  end
  inner
end

#format(tag, time, record) ⇒ Object



121
122
123
124
125
126
127
128
# File 'lib/fluent/plugin/out_sumologic.rb', line 121

def format(tag, time, record)
  if defined? time.nsec
    mstime = time * 1000 + (time.nsec / 1000000)
    [mstime, record].to_msgpack
  else
    [time, record].to_msgpack
  end
end

#formatted_to_msgpack_binaryObject



130
131
132
# File 'lib/fluent/plugin/out_sumologic.rb', line 130

def formatted_to_msgpack_binary
  true
end

#merge_json(record) ⇒ Object

Used to merge log record into top level json



100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_sumologic.rb', line 100

def merge_json(record)
  if record.has_key?(@log_key)
    log = record[@log_key].strip
    if log[0].eql?('{') && log[-1].eql?('}')
      begin
        record = JSON.parse(log).merge(record)
        record.delete(@log_key)
      rescue JSON::ParserError
        # do nothing, ignore
      end
    end
  end
  record
end

#shutdownObject

This method is called when shutting down.



95
96
97
# File 'lib/fluent/plugin/out_sumologic.rb', line 95

def shutdown
  super
end

#startObject

This method is called when starting.



90
91
92
# File 'lib/fluent/plugin/out_sumologic.rb', line 90

def start
  super
end

#sumo_key(sumo_metadata, record, tag) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/fluent/plugin/out_sumologic.rb', line 134

def sumo_key(, record, tag)
  source_name = ['source'] || @source_name
  source_name = expand_param(source_name, tag, nil, record)

  source_category = ['category'] || @source_category
  source_category = expand_param(source_category, tag, nil, record)

  source_host = ['host'] || @source_host
  source_host = expand_param(source_host, tag, nil, record)

  "#{source_name}:#{source_category}:#{source_host}"
end

#sumo_timestamp(time) ⇒ Object

Convert timestamp to 13 digit epoch if necessary



148
149
150
# File 'lib/fluent/plugin/out_sumologic.rb', line 148

def sumo_timestamp(time)
  time.to_s.length == 13 ? time : time * 1000
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/fluent/plugin/out_sumologic.rb', line 182

def write(chunk)
  tag = chunk..tag
  messages_list = {}

  # Sort messages
  chunk.msgpack_each do |time, record|
    # plugin dies randomly
    # https://github.com/uken/fluent-plugin-elasticsearch/commit/8597b5d1faf34dd1f1523bfec45852d380b26601#diff-ae62a005780cc730c558e3e4f47cc544R94
    next unless record.is_a? Hash
     = record.fetch('_sumo_metadata', { 'source' => record[@source_name_key] })
    key           = sumo_key(, record, tag)
    log_format    = ['log_format'] || @log_format

    # Strip any unwanted newlines
    record[@log_key].chomp! if record[@log_key]

    case log_format
      when 'text'
        log = record[@log_key]
        unless log.nil?
          log.strip!
        end
      when 'json_merge'
        log = dump_log(merge_json({ :timestamp => sumo_timestamp(time) }.merge(record)))
      else
        log = dump_log({ :timestamp => sumo_timestamp(time) }.merge(record))
    end

    unless log.nil?
      if messages_list.key?(key)
        messages_list[key].push(log)
      else
        messages_list[key] = [log]
      end
    end

  end

  # Push logs to sumo
  messages_list.each do |key, messages|
    source_name, source_category, source_host = key.split(':')
    @sumo_conn.publish(
        messages.join("\n"),
        source_host    =source_host,
        source_category=source_category,
        source_name    =source_name
    )
  end

end