Class: Fluent::SplunkHTTPEventcollectorOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_splunk-http-eventcollector.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSplunkHTTPEventcollectorOutput

Called on class load (class initializer)



70
71
72
73
74
75
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 70

def initialize
  super
  log.trace "splunk-http-eventcollector(initialize) called"
  require 'net/http/persistent'
  require 'openssl'
end

Class Method Details

.placeholder_expander(log) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 79

def self.placeholder_expander(log)
  # Use internal class in order to expand placeholder
  if defined?(Fluent::Filter) # for v0.12, built-in PlaceholderExpander
    begin
      require 'fluent/plugin/filter_record_transformer'
      if defined?(Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander)
        # for v0.14
        return Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander.new(log: log)
      else
        # for v0.12
        return Fluent::RecordTransformerFilter::PlaceholderExpander.new(log: log)
      end
    rescue LoadError => e
      raise ConfigError, "cannot find filter_record_transformer plugin: #{e.message}"
    end
  else # for v0.10, use PlaceholderExapander in fluent-plugin-record-reformer plugin
    begin
      require 'fluent/plugin/out_record_reformer.rb'
      return Fluent::RecordReformerOutput::PlaceholderExpander.new(log: log)
    rescue LoadError => e
      raise ConfigError, "cannot find fluent-plugin-record-reformer: #{e.message}"
    end
  end
end

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 107

def configure(conf)
  super
  log.trace "splunk-http-eventcollector(configure) called"
  begin
    @splunk_uri = URI "#{@protocol}://#{@server}/services/collector"
  rescue
    raise ConfigError, "Unable to parse the server into a URI."
  end

  @placeholder_expander = Fluent::SplunkHTTPEventcollectorOutput.placeholder_expander(log)
  @hostname = Socket.gethostname
  # TODO Add other robust input/syntax checks.
end

#convert_to_utf8(input) ⇒ Object

Encode as UTF-8. If ‘coerce_to_utf8’ is set to true in the config, any non-UTF-8 character would be replaced by the string specified by ‘non_utf8_replacement_string’. If ‘coerce_to_utf8’ is set to false, any non-UTF-8 character would trigger the plugin to error out. Thanks to github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/dbc28575/lib/fluent/plugin/out_google_cloud.rb#L1284



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 293

def convert_to_utf8(input)
  if input.is_a?(Hash)
    record = {}
    input.each do |key, value|
      record[convert_to_utf8(key)] = convert_to_utf8(value)
    end

    return record
  end
  return input.map { |value| convert_to_utf8(value) } if input.is_a?(Array)
  return input unless input.respond_to?(:encode)

  if @coerce_to_utf8
    input.encode(
      'utf-8',
      invalid: :replace,
      undef: :replace,
      replace: @non_utf8_replacement_string)
  else
    begin
      input.encode('utf-8')
    rescue EncodingError
      @log.error 'Encountered encoding issues potentially due to non ' \
                 'UTF-8 characters. To allow non-UTF-8 characters and ' \
                 'replace them with spaces, please set "coerce_to_utf8" ' \
                 'to true.'
      raise
    end
  end
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches to Fluentd. (like unbuffered emit()) Convert the event to a raw string.



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 147

def format(tag, time, record)
  #log.trace "splunk-http-eventcollector(format) called"
  # Basic object for Splunk. Note explicit type-casting to avoid accidental errors.

  placeholder_values = {
    'tag' => tag,
    'tag_parts' => tag.split('.'),
    'hostname' => @hostname,
    'record' => record
  }

  placeholders = @placeholder_expander.prepare_placeholders(placeholder_values)

  splunk_object = Hash[
      "time" => time.to_i,
      "source" => if @source.nil? then tag.to_s else @placeholder_expander.expand(@source, placeholders) end,
      "sourcetype" => @placeholder_expander.expand(@sourcetype.to_s, placeholders),
      "host" => @placeholder_expander.expand(@host.to_s, placeholders),
      "index" =>  @placeholder_expander.expand(@index, placeholders)
    ]
  # TODO: parse different source types as expected: KVP, JSON, TEXT
  if @all_items
    splunk_object["event"] = convert_to_utf8(record)
  else
    splunk_object["event"] = convert_to_utf8(record["message"])
  end

  json_event = splunk_object.to_json
  #log.debug "Generated JSON(#{json_event.class.to_s}): #{json_event.to_s}"
  #log.debug "format: returning: #{[tag, record].to_json.to_s}"
  json_event
end

#numfmt(input) ⇒ Object

push_buffer



283
284
285
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 283

def numfmt(input)
  input.to_s.reverse.gsub(/(\d{3})(?=\d)/, '\1,').reverse
end

#push_buffer(body) ⇒ Object

write



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 241

def push_buffer(body)
  post = Net::HTTP::Post.new @splunk_uri.request_uri
  post.body = body
  log.debug "POST #{@splunk_uri}"
  if @test_mode
    log.debug "TEST_MODE Payload: #{body}"
    return
  end
  # retry up to :post_retry_max times
  1.upto(@post_retry_max) do |c|
    response = @http.request @splunk_uri, post
    log.debug "=>(#{c}/#{numfmt(@post_retry_max)}) #{response.code} " +
        "(#{response.message})"
    # TODO check the actual server response too (it's JSON)
    if response.code == "200"  # and...
      # success
      break
    # TODO check 40X response within post_retry_max and retry
    elsif response.code.match(/^50/) and c < @post_retry_max
      # retry
      log.warn "#{@splunk_uri}: Server error #{response.code} (" +
          "#{response.message}). Retrying in #{@post_retry_interval} " +
          "seconds.\n#{response.body}"
      sleep @post_retry_interval
      next
    elsif response.code.match(/^40/)
      # user error
      log.error "#{@splunk_uri}: #{response.code} (#{response.message})\n#{response.body}"
      break
    elsif c < @post_retry_max
      # retry
      log.debug "#{@splunk_uri}: Retrying..."
      sleep @post_retry_interval
      next
    else
      # other errors. fluentd will retry processing on exception
      # FIXME: this may duplicate logs when using multiple buffers
      raise "#{@splunk_uri}: #{response.message}\n#{response.body}"
    end # If response.code
  end # 1.upto(@post_retry_max)
end

#shutdownObject

This method is called when shutting down. Shutdown the thread and close sockets or files here.



137
138
139
140
141
142
143
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 137

def shutdown
  super
  log.trace "splunk-http-eventcollector(shutdown) called"

  @http.shutdown
  log.trace "shutdown from splunk-http-eventcollector"
end

#startObject

This method is called when starting. Open sockets or files here.



123
124
125
126
127
128
129
130
131
132
133
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 123

def start
  super
  log.trace "splunk-http-eventcollector(start) called"
  @http = Net::HTTP::Persistent.new 'fluent-plugin-splunk-http-eventcollector'
  @http.verify_mode = OpenSSL::SSL::VERIFY_NONE unless @verify
  @http.override_headers['Content-Type'] = 'application/json'
  @http.override_headers['User-Agent'] = 'fluent-plugin-splunk-http-eventcollector/0.0.1'
  @http.override_headers['Authorization'] = "Splunk #{@token}"

  log.trace "initialized for splunk-http-eventcollector"
end

#write(chunk) ⇒ Object

By this point, fluentd has decided its buffer is full and it’s time to flush it. chunk.read is a concatenated string of JSON.to_s objects. Simply POST them to Splunk and go about our life. This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.

NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 190

def write(chunk)
  log.trace "splunk-http-eventcollector(write) called"

  # Break the concatenated string of JSON-formatted events into an Array
  split_chunk = chunk.read.split("}{").each do |x|
    # Reconstruct the opening{/closing} that #split() strips off.
    x.prepend("{") unless x.start_with?("{")
    x << "}" unless x.end_with?("}")
  end
  log.debug "Pushing #{numfmt(split_chunk.size)} events (" +
      "#{numfmt(chunk.read.bytesize)} bytes) to Splunk."
  # If fluentd is pushing too much data to Splunk at once, split up the payload
  # Don't care about the number of events so much as the POST size (bytes)
  #if split_chunk.size > @batch_event_limit
  #  log.warn "Fluentd is attempting to push #{numfmt(split_chunk.size)} " +
  #      "events in a single push to Splunk. The configured limit is " +
  #      "#{numfmt(@batch_event_limit)}."
  #end
  if chunk.read.bytesize > @batch_size_limit
    log.warn "Fluentd is attempting to push #{numfmt(chunk.read.bytesize)} " +
        "bytes in a single push to Splunk. The configured limit is " +
        "#{numfmt(@batch_size_limit)} bytes."
    newbuffer = Array.new
    split_chunk_counter = 0
    split_chunk.each do |c|
      split_chunk_counter = split_chunk_counter + 1
      #log.debug "(#{numfmt(split_chunk_counter)}/#{numfmt(split_chunk.size)}) " +
      #    "newbuffer.bytesize=#{numfmt(newbuffer.join.bytesize)} + " +
      #    "c.bytesize=#{numfmt(c.bytesize)} ????"
      if newbuffer.join.bytesize + c.bytesize < @batch_size_limit
        #log.debug "Appended!"
        newbuffer << c
      else
        # Reached the limit - push the current newbuffer.join, and reset
        #log.debug "Would exceed limit. Flushing newbuffer and continuing."
        log.debug "(#{numfmt(split_chunk_counter)}/#{numfmt(split_chunk.size)}) " +
            "newbuffer.bytesize=#{numfmt(newbuffer.join.bytesize)} + " +
            "c.bytesize=#{numfmt(c.bytesize)} > #{numfmt(@batch_size_limit)}, " +
            "flushing current buffer to Splunk."
        push_buffer newbuffer.join
        newbuffer = Array c
      end # if/else buffer fits limit
    end # split_chunk.each
    # Push anything left over.
    push_buffer newbuffer.join if newbuffer.size
    return
  else
    return push_buffer chunk.read
  end # if chunk.read.bytesize > @batch_size_limit
end