Class: LogStash::Outputs::InfluxDB2

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/influxdb2.rb

Defined Under Namespace

Classes: ToStr

Instance Method Summary collapse

Instance Method Details

#_to_kwargs(hash) ⇒ Object



56
57
58
59
60
61
62
63
64
# File 'lib/logstash/outputs/influxdb2.rb', line 56

def _to_kwargs(hash)
  hash.map do |k,v|
    case v
    when "true"  then v = true
    when "false" then v = false
    end
    [k.to_sym, v]
  end.to_h
end

#closeObject

def event



52
53
54
# File 'lib/logstash/outputs/influxdb2.rb', line 52

def close
  @client.close!
end

#receive(event) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/logstash/outputs/influxdb2.rb', line 31

def receive(event)
  fields = event.get(@fields)
  return unless fields.is_a?(Hash) && ! fields.empty?

  tags = @tags.nil? ? nil : event.get(@tags)
  return unless tags.nil? || tags.is_a?(Hash)

  unless @escape_value
    fields = fields.transform_values { |v| ToStr.new(v) }
  end

  @write_api.write(data: InfluxDB2::Point.new(
    name: event.sprintf(@measurement), tags: tags, fields: fields,
    time: event.timestamp.time, precision: @precision))

rescue InfluxDB2::InfluxError => ie
  @logger.warn("HTTP communication error while writing to InfluxDB", :exception => ie)
rescue Exception => e
  @logger.warn("Non recoverable exception while writing to InfluxDB", :exception => e)
end

#registerObject



23
24
25
26
27
28
# File 'lib/logstash/outputs/influxdb2.rb', line 23

def register
  @precision = @options.fetch("precision", InfluxDB2::DEFAULT_WRITE_PRECISION)
  @client = InfluxDB2::Client.new(@url, @token.value, **_to_kwargs(@options))
  write_options = InfluxDB2::WriteOptions.new(**_to_kwargs(@write_options))
  @write_api = @client.create_write_api(write_options: write_options)
end