Class: LogStash::Outputs::InfluxDB

Inherits:
Base
  • Object
show all
Includes:
Stud::Buffer
Defined in:
lib/logstash/outputs/influxdb.rb

Overview

This output lets you output Metrics to InfluxDB (>= 0.9.0-rc31)

The configuration here attempts to be as friendly as possible and minimize the need for multiple definitions to write to multiple measurements and still be efficient

the InfluxDB API let’s you do some semblance of bulk operation per http call but each call is database-specific

You can learn more at influxdb.com[InfluxDB homepage]

Instance Method Summary collapse

Instance Method Details

#closeObject



200
201
202
# File 'lib/logstash/outputs/influxdb.rb', line 200

def close
  buffer_flush(:final => true)
end

#coerce_value(value_type, value) ⇒ Object



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/logstash/outputs/influxdb.rb', line 237

def coerce_value(value_type, value)
  case value_type.to_sym
  when :integer
    value.to_i
    
  when :float
    value.to_f

  when :string
    value.to_s
  
  else
    @logger.warn("Don't know how to convert to #{value_type}. Returning value unchanged")
    value  
  end
end

#coerce_values!(event_data) ⇒ Object

Coerce values in the event data to their appropriate type. This requires foreknowledge of what’s in the data point, which is less than ideal. An alternative is to use a ‘code` filter and manipulate the individual point’s data before sending to the output pipeline



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/logstash/outputs/influxdb.rb', line 220

def coerce_values!(event_data)
  @coerce_values.each do |column, value_type|
    if event_data.has_key?(column)
      begin
        @logger.debug? and @logger.debug("Converting column #{column} to type #{value_type}: Current value: #{event_data[column]}")
        event_data[column] = coerce_value(value_type, event_data[column])

      rescue => e
        @logger.error("Unhandled exception", :error => e.message)
      end
    end
  end

  event_data
end

#create_point_from_event(event) ⇒ Object

Create a data point from an event. If @use_event_fields_for_data_points is true, convert the event to a hash. Otherwise, use @data_points. Each key and value will be run through event#sprintf with the exception of a non-String value (which will be passed through)



209
210
211
212
213
# File 'lib/logstash/outputs/influxdb.rb', line 209

def create_point_from_event(event)
  Hash[ (@use_event_fields_for_data_points ? event.to_hash : @data_points).map do |k,v| 
    [event.sprintf(k), (String === v ? event.sprintf(v) : v)] 
  end ]
end

#dowrite(events, database) ⇒ Object

def flush



188
189
190
191
192
193
194
195
196
197
198
# File 'lib/logstash/outputs/influxdb.rb', line 188

def dowrite(events, database)
  begin
      @influxdbClient.write_points(events, @time_precision, @retention_policy, @db  )
  rescue InfluxDB::AuthenticationError => ae
      @logger.warn("Authentication Error while writing to InfluxDB", :exception => ae)
  rescue InfluxDB::ConnectionError => ce 
      @logger.warn("Connection Error while writing to InfluxDB", :exception => ce)
  rescue Exception => e
      @logger.warn("Non recoverable exception while writing to InfluxDB", :exception => e)
  end
end

#exclude_fields!(event_data) ⇒ Object

Remove a set of fields from the event data before sending it to Influxdb. This is useful for removing @timestamp, @version, etc



257
258
259
# File 'lib/logstash/outputs/influxdb.rb', line 257

def exclude_fields!(event_data)
  @exclude_fields.each { |field| event_data.delete(field) }
end

#extract_tags(fields) ⇒ Object

Extract tags from a hash of fields. Returns a tuple containing a hash of tags (as configured by send_as_tags) and a hash of fields that exclude the tags. If fields contains a key “tags” with an array, they will be moved to the tags hash (and each will be given a value of true)

Example:

# Given send_as_tags: ["bar"]
original_fields = {"foo" => 1, "bar" => 2, "tags" => ["tag"]}
tags, fields = extract_tags(original_fields)
# tags: {"bar" => 2, "tag" => "true"} and fields: {"foo" => 1}


273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/logstash/outputs/influxdb.rb', line 273

def extract_tags(fields)
  remainder = fields.dup

  tags = if remainder.has_key?("tags") && remainder["tags"].respond_to?(:inject)
    remainder.delete("tags").inject({}) { |tags, tag| tags[tag] = "true"; tags }
  else
    {}
  end
  
  @send_as_tags.each { |key| (tags[key] = remainder.delete(key)) if remainder.has_key?(key) }

  tags.delete_if { |key,value| value.nil? || value == "" }
  remainder.delete_if { |key,value| value.nil? || value == "" }

  [tags, remainder]
end

#flush(events, database, teardown = false) ⇒ Object



183
184
185
186
# File 'lib/logstash/outputs/influxdb.rb', line 183

def flush(events, database, teardown = false)
  @logger.debug? and @logger.debug("Flushing #{events.size} events to #{database} - Teardown? #{teardown}")
  dowrite(events, database)
end

#receive(event) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/logstash/outputs/influxdb.rb', line 145

def receive(event)
  @logger.debug? and @logger.debug("Influxdb output: Received event: #{event}")

  # An Influxdb 0.9 event looks like this: 
  # cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000
  #  ^ measurement  ^ tags (optional)            ^ fields   ^ timestamp (optional)
  # 
  # Since we'll be buffering them to send as a batch, we'll only collect
  # the values going into the points array
  
  time  = timestamp_at_precision(event.timestamp, @time_precision.to_sym)
  point = create_point_from_event(event)
  exclude_fields!(point)
  coerce_values!(point)

  if point.has_key?('time')
    unless @allow_time_override
      logger.error("Cannot override value of time without 'allow_time_override'. Using event timestamp")
    else
      time = point.delete("time")
    end
  end



  tags, point = extract_tags(point)

  event_hash = {
    :series => event.sprintf(@measurement),
    :timestamp       => time,
    :values      => point
  }
  event_hash[:tags] = tags unless tags.empty?

  buffer_receive(event_hash, event.sprintf(@db))
end

#registerObject



128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/logstash/outputs/influxdb.rb', line 128

def register
  require 'cgi'
  
  @queue = []

  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )
  @auth_method = @user.nil? ? 'none'.freeze  : "params".freeze 
  
  @influxdbClient = InfluxDB::Client.new host: @host, port: @port, time_precision: @time_precision, use_ssl: @ssl, verify_ssl: false, retry: @max_retries, initial_delay: @initial_delay, auth_method: @auth_method, username: @user, password: @password.value
end

#timestamp_at_precision(timestamp, precision) ⇒ Object

Returns the numeric value of the given timestamp in the requested precision. precision must be one of the valid values for time_precision



293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/logstash/outputs/influxdb.rb', line 293

def timestamp_at_precision( timestamp, precision )
  multiplier = case precision
    when :h  then 1.0/3600
    when :m  then 1.0/60
    when :s  then 1
    when :ms then 1000
    when :u  then 1000000
    when :n  then 1000000000
  end
  
  (timestamp.to_f * multiplier).to_i
end