Class: Fluent::SplunkHTTPEventcollectorOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::SplunkHTTPEventcollectorOutput
- Defined in:
- lib/fluent/plugin/out_splunk-http-eventcollector.rb
Class Method Summary collapse
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
-
#convert_to_utf8(input) ⇒ Object
Encode as UTF-8.
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
-
#initialize ⇒ SplunkHTTPEventcollectorOutput
constructor
Called on class load (class initializer).
-
#numfmt(input) ⇒ Object
push_buffer.
-
#push_buffer(body) ⇒ Object
write.
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
-
#write(chunk) ⇒ Object
By this point, fluentd has decided its buffer is full and it’s time to flush it.
Constructor Details
#initialize ⇒ SplunkHTTPEventcollectorOutput
Called on class load (class initializer)
70 71 72 73 74 75 |
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 70 def initialize super log.trace "splunk-http-eventcollector(initialize) called" require 'net/http/persistent' require 'openssl' end |
Class Method Details
.placeholder_expander(log) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 79 def self.(log) # Use internal class in order to expand placeholder if defined?(Fluent::Filter) # for v0.12, built-in PlaceholderExpander begin require 'fluent/plugin/filter_record_transformer' if defined?(Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander) # for v0.14 return Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander.new(log: log) else # for v0.12 return Fluent::RecordTransformerFilter::PlaceholderExpander.new(log: log) end rescue LoadError => e raise ConfigError, "cannot find filter_record_transformer plugin: #{e.message}" end else # for v0.10, use PlaceholderExapander in fluent-plugin-record-reformer plugin begin require 'fluent/plugin/out_record_reformer.rb' return Fluent::RecordReformerOutput::PlaceholderExpander.new(log: log) rescue LoadError => e raise ConfigError, "cannot find fluent-plugin-record-reformer: #{e.message}" end end end |
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 107 def configure(conf) super log.trace "splunk-http-eventcollector(configure) called" begin @splunk_uri = URI "#{@protocol}://#{@server}/services/collector" rescue raise ConfigError, "Unable to parse the server into a URI." end = Fluent::SplunkHTTPEventcollectorOutput.(log) @hostname = Socket.gethostname # TODO Add other robust input/syntax checks. end |
#convert_to_utf8(input) ⇒ Object
Encode as UTF-8. If ‘coerce_to_utf8’ is set to true in the config, any non-UTF-8 character would be replaced by the string specified by ‘non_utf8_replacement_string’. If ‘coerce_to_utf8’ is set to false, any non-UTF-8 character would trigger the plugin to error out. Thanks to github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/dbc28575/lib/fluent/plugin/out_google_cloud.rb#L1284
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 293 def convert_to_utf8(input) if input.is_a?(Hash) record = {} input.each do |key, value| record[convert_to_utf8(key)] = convert_to_utf8(value) end return record end return input.map { |value| convert_to_utf8(value) } if input.is_a?(Array) return input unless input.respond_to?(:encode) if @coerce_to_utf8 input.encode( 'utf-8', invalid: :replace, undef: :replace, replace: @non_utf8_replacement_string) else begin input.encode('utf-8') rescue EncodingError @log.error 'Encountered encoding issues potentially due to non ' \ 'UTF-8 characters. To allow non-UTF-8 characters and ' \ 'replace them with spaces, please set "coerce_to_utf8" ' \ 'to true.' raise end end end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd. (like unbuffered emit()) Convert the event to a raw string.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 147 def format(tag, time, record) #log.trace "splunk-http-eventcollector(format) called" # Basic object for Splunk. Note explicit type-casting to avoid accidental errors. placeholder_values = { 'tag' => tag, 'tag_parts' => tag.split('.'), 'hostname' => @hostname, 'record' => record } placeholders = .prepare_placeholders(placeholder_values) splunk_object = Hash[ "time" => time.to_i, "source" => if @source.nil? then tag.to_s else .(@source, placeholders) end, "sourcetype" => .(@sourcetype.to_s, placeholders), "host" => .(@host.to_s, placeholders), "index" => .(@index, placeholders) ] # TODO: parse different source types as expected: KVP, JSON, TEXT if @all_items splunk_object["event"] = convert_to_utf8(record) else splunk_object["event"] = convert_to_utf8(record["message"]) end json_event = splunk_object.to_json #log.debug "Generated JSON(#{json_event.class.to_s}): #{json_event.to_s}" #log.debug "format: returning: #{[tag, record].to_json.to_s}" json_event end |
#numfmt(input) ⇒ Object
push_buffer
283 284 285 |
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 283 def numfmt(input) input.to_s.reverse.gsub(/(\d{3})(?=\d)/, '\1,').reverse end |
#push_buffer(body) ⇒ Object
write
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 241 def push_buffer(body) post = Net::HTTP::Post.new @splunk_uri.request_uri post.body = body log.debug "POST #{@splunk_uri}" if @test_mode log.debug "TEST_MODE Payload: #{body}" return end # retry up to :post_retry_max times 1.upto(@post_retry_max) do |c| response = @http.request @splunk_uri, post log.debug "=>(#{c}/#{numfmt(@post_retry_max)}) #{response.code} " + "(#{response.message})" # TODO check the actual server response too (it's JSON) if response.code == "200" # and... # success break # TODO check 40X response within post_retry_max and retry elsif response.code.match(/^50/) and c < @post_retry_max # retry log.warn "#{@splunk_uri}: Server error #{response.code} (" + "#{response.message}). Retrying in #{@post_retry_interval} " + "seconds.\n#{response.body}" sleep @post_retry_interval next elsif response.code.match(/^40/) # user error log.error "#{@splunk_uri}: #{response.code} (#{response.message})\n#{response.body}" break elsif c < @post_retry_max # retry log.debug "#{@splunk_uri}: Retrying..." sleep @post_retry_interval next else # other errors. fluentd will retry processing on exception # FIXME: this may duplicate logs when using multiple buffers raise "#{@splunk_uri}: #{response.message}\n#{response.body}" end # If response.code end # 1.upto(@post_retry_max) end |
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
137 138 139 140 141 142 143 |
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 137 def shutdown super log.trace "splunk-http-eventcollector(shutdown) called" @http.shutdown log.trace "shutdown from splunk-http-eventcollector" end |
#start ⇒ Object
This method is called when starting. Open sockets or files here.
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 123 def start super log.trace "splunk-http-eventcollector(start) called" @http = Net::HTTP::Persistent.new 'fluent-plugin-splunk-http-eventcollector' @http.verify_mode = OpenSSL::SSL::VERIFY_NONE unless @verify @http.override_headers['Content-Type'] = 'application/json' @http.override_headers['User-Agent'] = 'fluent-plugin-splunk-http-eventcollector/0.0.1' @http.override_headers['Authorization'] = "Splunk #{@token}" log.trace "initialized for splunk-http-eventcollector" end |
#write(chunk) ⇒ Object
By this point, fluentd has decided its buffer is full and it’s time to flush it. chunk.read is a concatenated string of JSON.to_s objects. Simply POST them to Splunk and go about our life. This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', line 190 def write(chunk) log.trace "splunk-http-eventcollector(write) called" # Break the concatenated string of JSON-formatted events into an Array split_chunk = chunk.read.split("}{").each do |x| # Reconstruct the opening{/closing} that #split() strips off. x.prepend("{") unless x.start_with?("{") x << "}" unless x.end_with?("}") end log.debug "Pushing #{numfmt(split_chunk.size)} events (" + "#{numfmt(chunk.read.bytesize)} bytes) to Splunk." # If fluentd is pushing too much data to Splunk at once, split up the payload # Don't care about the number of events so much as the POST size (bytes) #if split_chunk.size > @batch_event_limit # log.warn "Fluentd is attempting to push #{numfmt(split_chunk.size)} " + # "events in a single push to Splunk. The configured limit is " + # "#{numfmt(@batch_event_limit)}." #end if chunk.read.bytesize > @batch_size_limit log.warn "Fluentd is attempting to push #{numfmt(chunk.read.bytesize)} " + "bytes in a single push to Splunk. The configured limit is " + "#{numfmt(@batch_size_limit)} bytes." newbuffer = Array.new split_chunk_counter = 0 split_chunk.each do |c| split_chunk_counter = split_chunk_counter + 1 #log.debug "(#{numfmt(split_chunk_counter)}/#{numfmt(split_chunk.size)}) " + # "newbuffer.bytesize=#{numfmt(newbuffer.join.bytesize)} + " + # "c.bytesize=#{numfmt(c.bytesize)} ????" if newbuffer.join.bytesize + c.bytesize < @batch_size_limit #log.debug "Appended!" newbuffer << c else # Reached the limit - push the current newbuffer.join, and reset #log.debug "Would exceed limit. Flushing newbuffer and continuing." log.debug "(#{numfmt(split_chunk_counter)}/#{numfmt(split_chunk.size)}) " + "newbuffer.bytesize=#{numfmt(newbuffer.join.bytesize)} + " + "c.bytesize=#{numfmt(c.bytesize)} > #{numfmt(@batch_size_limit)}, " + "flushing current buffer to Splunk." push_buffer newbuffer.join newbuffer = Array c end # if/else buffer fits limit end # split_chunk.each # Push anything left over. push_buffer newbuffer.join if newbuffer.size return else return push_buffer chunk.read end # if chunk.read.bytesize > @batch_size_limit end |