Class: Fluent::Plugin::SplunkHECOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::SplunkHECOutput
- Defined in:
- lib/fluent/plugin/out_splunkhec.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
- #expand_param(param, tag, time, record) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
- #formatted_to_msgpack_binary? ⇒ Boolean
- #multi_workers_ready? ⇒ Boolean
- #send_to_splunk(body, token) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
-
#write(chunk) ⇒ Object
Loop through all records and sent them to Splunk.
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. Here we construct the Splunk HEC URL to POST data to If the configuration is invalid, raise Fluent::ConfigError.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/fluent/plugin/out_splunkhec.rb', line 35 def configure(conf) compat_parameters_convert(conf, :buffer) super @splunk_url = @protocol + '://' + @host + ':' + @port + '/services/collector/event' log.info 'splunkhec: sending data to ' + @splunk_url if conf['event_host'] == nil begin @event_host = `hostname`.delete!("\n") rescue @event_host = 'unknown' end end @packer = Fluent::Engine.msgpack_factory.packer end |
#expand_param(param, tag, time, record) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/out_splunkhec.rb', line 73 def (param, tag, time, record) # check for '${ ... }' # yes => `eval` # no => return param return param if (param =~ /\${.+}/).nil? # check for 'tag_parts[]' # separated by a delimiter (default '.') tag_parts = tag.split(@delimiter) unless (param =~ /tag_parts\[.+\]/).nil? || tag.nil? # pull out section between ${} then eval inner = param.clone while inner.match(/\${.+}/) to_eval = inner.match(/\${(.+?)}/){$1} if !(to_eval =~ /record\[.+\]/).nil? && record.nil? return to_eval elsif !(to_eval =~/tag_parts\[.+\]/).nil? && tag_parts.nil? return to_eval elsif !(to_eval =~/time/).nil? && time.nil? return to_eval else inner.sub!(/\${.+?}/, eval( to_eval )) end end inner end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd. Use msgpack to serialize the object.
69 70 71 |
# File 'lib/fluent/plugin/out_splunkhec.rb', line 69 def format(tag, time, record) @packer.pack([tag, time, record]).to_s end |
#formatted_to_msgpack_binary? ⇒ Boolean
59 60 61 |
# File 'lib/fluent/plugin/out_splunkhec.rb', line 59 def formatted_to_msgpack_binary? true end |
#multi_workers_ready? ⇒ Boolean
63 64 65 |
# File 'lib/fluent/plugin/out_splunkhec.rb', line 63 def multi_workers_ready? true end |
#send_to_splunk(body, token) ⇒ Object
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/fluent/plugin/out_splunkhec.rb', line 159 def send_to_splunk(body, token) log.debug "splunkhec: " + body + "\n" uri = URI(@splunk_url) # Create client http = Net::HTTP.new(uri.host, uri.port) http.set_debug_output(log.debug) # Create request req = Net::HTTP::Post.new(uri, "Content-Type" => "application/json; charset=utf-8", "Authorization" => "Splunk #{token}") req.body = body # Handle SSL if @protocol == 'https' http.use_ssl = true end # Send Request res = http.request(req) log.debug "splunkhec: HTTP Response Status Code is #{res.code}" if res.code.to_i != 200 body = JSON.parse(res.body) raise SplunkHECOutputError.new(body['text'], body['code'], body['invalid-event-number'], res.code) end end |
#shutdown ⇒ Object
55 56 57 |
# File 'lib/fluent/plugin/out_splunkhec.rb', line 55 def shutdown super end |
#start ⇒ Object
51 52 53 |
# File 'lib/fluent/plugin/out_splunkhec.rb', line 51 def start super end |
#write(chunk) ⇒ Object
Loop through all records and sent them to Splunk
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/fluent/plugin/out_splunkhec.rb', line 102 def write(chunk) body = '' chunk.msgpack_each {|(tag,time,record)| # define index and sourcetype dynamically begin index = (@index, tag, time, record) sourcetype = (@sourcetype, tag, time, record) event_host = (@event_host, tag, time, record) token = (@token, tag, time, record) rescue => e # handle dynamic parameters misconfigurations router.emit_error_event(tag, time, record, e) next end log.debug "routing event from #{event_host} to #{index} index" log.debug "expanded token #{token}" # Parse record to Splunk event format case record when Integer event = record.to_s when Hash if @send_event_as_json event = record.to_json else event = record.to_json.gsub("\"", %q(\\\")) end else event = record end sourcetype = @sourcetype == 'tag' ? tag : @sourcetype # Build body for the POST request if !@usejson event = record["time"]+ " " + record["message"].to_json.gsub(/^"|"$/,"") body << '{"time":"'+ DateTime.parse(record["time"]).strftime("%Q") +'", "event":"' + event + '", "sourcetype" :"' + sourcetype + '", "source" :"' + @source + '", "index" :"' + index + '", "host" : "' + event_host + '"}' elsif @send_event_as_json body << '{"time" :' + time.to_s + ', "event" :' + event + ', "sourcetype" :"' + sourcetype + '", "source" :"' + source + '", "index" :"' + index + '", "host" : "' + event_host + '"}' else body << '{"time" :' + time.to_s + ', "event" :"' + event + '", "sourcetype" :"' + sourcetype + '", "source" :"' + source + '", "index" :"' + index + '", "host" : "' + event_host + '"}' end if @send_batched_events body << "\n" else send_to_splunk(body, token) body = '' end } if @send_batched_events send_to_splunk(body, token) end end |