Class: LogStash::Outputs::InfluxDB
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::InfluxDB
- Includes:
- Stud::Buffer
- Defined in:
- lib/logstash/outputs/influxdb.rb
Overview
This output lets you output Metrics to InfluxDB (>= 0.9.0-rc31)
The configuration here attempts to be as friendly as possible and minimize the need for multiple definitions to write to multiple measurements and still be efficient
the InfluxDB API let’s you do some semblance of bulk operation per http call but each call is database-specific
You can learn more at influxdb.com[InfluxDB homepage]
Instance Method Summary collapse
- #close ⇒ Object
- #coerce_value(value_type, value) ⇒ Object
-
#coerce_values!(event_data) ⇒ Object
Coerce values in the event data to their appropriate type.
-
#create_point_from_event(event) ⇒ Object
Create a data point from an event.
-
#dowrite(events, database) ⇒ Object
def flush.
-
#exclude_fields!(event_data) ⇒ Object
Remove a set of fields from the event data before sending it to Influxdb.
-
#extract_tags(fields) ⇒ Object
Extract tags from a hash of fields.
- #flush(events, database, teardown = false) ⇒ Object
- #receive(event) ⇒ Object
- #register ⇒ Object
-
#timestamp_at_precision(timestamp, precision) ⇒ Object
Returns the numeric value of the given timestamp in the requested precision.
Instance Method Details
#close ⇒ Object
200 201 202 |
# File 'lib/logstash/outputs/influxdb.rb', line 200 def close buffer_flush(:final => true) end |
#coerce_value(value_type, value) ⇒ Object
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/logstash/outputs/influxdb.rb', line 237 def coerce_value(value_type, value) case value_type.to_sym when :integer value.to_i when :float value.to_f when :string value.to_s else @logger.warn("Don't know how to convert to #{value_type}. Returning value unchanged") value end end |
#coerce_values!(event_data) ⇒ Object
Coerce values in the event data to their appropriate type. This requires foreknowledge of what’s in the data point, which is less than ideal. An alternative is to use a ‘code` filter and manipulate the individual point’s data before sending to the output pipeline
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/logstash/outputs/influxdb.rb', line 220 def coerce_values!(event_data) @coerce_values.each do |column, value_type| if event_data.has_key?(column) begin @logger.debug? and @logger.debug("Converting column #{column} to type #{value_type}: Current value: #{event_data[column]}") event_data[column] = coerce_value(value_type, event_data[column]) rescue => e @logger.error("Unhandled exception", :error => e.) end end end event_data end |
#create_point_from_event(event) ⇒ Object
Create a data point from an event. If @use_event_fields_for_data_points is true, convert the event to a hash. Otherwise, use @data_points. Each key and value will be run through event#sprintf with the exception of a non-String value (which will be passed through)
209 210 211 212 213 |
# File 'lib/logstash/outputs/influxdb.rb', line 209 def create_point_from_event(event) Hash[ (@use_event_fields_for_data_points ? event.to_hash : @data_points).map do |k,v| [event.sprintf(k), (String === v ? event.sprintf(v) : v)] end ] end |
#dowrite(events, database) ⇒ Object
def flush
188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/logstash/outputs/influxdb.rb', line 188 def dowrite(events, database) begin @influxdbClient.write_points(events, @time_precision, @retention_policy, @db ) rescue InfluxDB::AuthenticationError => ae @logger.warn("Authentication Error while writing to InfluxDB", :exception => ae) rescue InfluxDB::ConnectionError => ce @logger.warn("Connection Error while writing to InfluxDB", :exception => ce) rescue Exception => e @logger.warn("Non recoverable exception while writing to InfluxDB", :exception => e) end end |
#exclude_fields!(event_data) ⇒ Object
Remove a set of fields from the event data before sending it to Influxdb. This is useful for removing @timestamp, @version, etc
257 258 259 |
# File 'lib/logstash/outputs/influxdb.rb', line 257 def exclude_fields!(event_data) @exclude_fields.each { |field| event_data.delete(field) } end |
#extract_tags(fields) ⇒ Object
Extract tags from a hash of fields. Returns a tuple containing a hash of tags (as configured by send_as_tags) and a hash of fields that exclude the tags. If fields contains a key “tags” with an array, they will be moved to the tags hash (and each will be given a value of true)
Example:
# Given send_as_tags: ["bar"]
original_fields = {"foo" => 1, "bar" => 2, "tags" => ["tag"]}
, fields = (original_fields)
# tags: {"bar" => 2, "tag" => "true"} and fields: {"foo" => 1}
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/logstash/outputs/influxdb.rb', line 273 def (fields) remainder = fields.dup = if remainder.has_key?("tags") && remainder["tags"].respond_to?(:inject) remainder.delete("tags").inject({}) { |, tag| [tag] = "true"; } else {} end @send_as_tags.each { |key| ([key] = remainder.delete(key)) if remainder.has_key?(key) } .delete_if { |key,value| value.nil? || value == "" } remainder.delete_if { |key,value| value.nil? || value == "" } [, remainder] end |
#flush(events, database, teardown = false) ⇒ Object
183 184 185 186 |
# File 'lib/logstash/outputs/influxdb.rb', line 183 def flush(events, database, teardown = false) @logger.debug? and @logger.debug("Flushing #{events.size} events to #{database} - Teardown? #{teardown}") dowrite(events, database) end |
#receive(event) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/logstash/outputs/influxdb.rb', line 145 def receive(event) @logger.debug? and @logger.debug("Influxdb output: Received event: #{event}") # An Influxdb 0.9 event looks like this: # cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000 # ^ measurement ^ tags (optional) ^ fields ^ timestamp (optional) # # Since we'll be buffering them to send as a batch, we'll only collect # the values going into the points array time = (event., @time_precision.to_sym) point = create_point_from_event(event) exclude_fields!(point) coerce_values!(point) if point.has_key?('time') unless @allow_time_override logger.error("Cannot override value of time without 'allow_time_override'. Using event timestamp") else time = point.delete("time") end end , point = (point) event_hash = { :series => event.sprintf(@measurement), :timestamp => time, :values => point } event_hash[:tags] = unless .empty? buffer_receive(event_hash, event.sprintf(@db)) end |
#register ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/logstash/outputs/influxdb.rb', line 128 def register require 'cgi' @queue = [] buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) @auth_method = @user.nil? ? 'none'.freeze : "params".freeze @influxdbClient = InfluxDB::Client.new host: @host, port: @port, time_precision: @time_precision, use_ssl: @ssl, verify_ssl: false, retry: @max_retries, initial_delay: @initial_delay, auth_method: @auth_method, username: @user, password: @password.value end |
#timestamp_at_precision(timestamp, precision) ⇒ Object
Returns the numeric value of the given timestamp in the requested precision. precision must be one of the valid values for time_precision
293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/logstash/outputs/influxdb.rb', line 293 def ( , precision ) multiplier = case precision when :h then 1.0/3600 when :m then 1.0/60 when :s then 1 when :ms then 1000 when :u then 1000000 when :n then 1000000000 end (.to_f * multiplier).to_i end |