Class: InfluxDBOutput

Inherits:
Fluent::Plugin::Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_influxdb2.rb

Overview

A buffered output plugin for Fluentd and InfluxDB 2

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
'memory'.freeze

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/fluent/plugin/out_influxdb2.rb', line 77

def configure(conf)
  compat_parameters_convert(conf, :inject)
  super
  case @time_precision
  when 'ns' then
    @precision_formatter = ->(ns_time) { ns_time }
  when 'us' then
    @precision_formatter = ->(ns_time) { (ns_time / 1e3).round }
  when 'ms' then
    @precision_formatter = ->(ns_time) { (ns_time / 1e6).round }
  when 's' then
    @precision_formatter = ->(ns_time) { (ns_time / 1e9).round }
  else
    raise Fluent::ConfigError, "The time precision #{@time_precision} is not supported. You should use: " \
                               'second (s), millisecond (ms), microsecond (us), or nanosecond (ns).'
  end
  @precision = InfluxDB2::WritePrecision.new.get_from_value(@time_precision)
  raise Fluent::ConfigError, 'The InfluxDB URL should be defined.' if @url.empty?
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/fluent/plugin/out_influxdb2.rb', line 110

def multi_workers_ready?
  true
end

#shutdownObject



105
106
107
108
# File 'lib/fluent/plugin/out_influxdb2.rb', line 105

def shutdown
  super
  @client.close!
end

#startObject



97
98
99
100
101
102
103
# File 'lib/fluent/plugin/out_influxdb2.rb', line 97

def start
  super
  log.info  "Connecting to InfluxDB: url: #{@url}, bucket: #{@bucket}, org: #{@org}, precision = #{@precision}, " \
            "use_ssl = #{@use_ssl}"
  @client = InfluxDB2::Client.new(@url, @token, bucket: @bucket, org: @org, precision: @precision, use_ssl: @use_ssl)
  @write_api = @client.create_write_api
end

#write(chunk) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/fluent/plugin/out_influxdb2.rb', line 114

def write(chunk)
  points = []
  tag = chunk..tag
  measurement = @measurement || tag
  chunk.msgpack_each do |time, record|
    if time.is_a?(Integer)
      time_formatted = time
    else
      nano_seconds = time.sec * 1e9
      nano_seconds += time.nsec
      time_formatted = @precision_formatter.call(nano_seconds)
    end
    point = InfluxDB2::Point
            .new(name: measurement)
    record.each_pair do |k, v|
      if k.eql?(@time_key)
        time_formatted = v
      else
        _parse_field(k, v, point)
      end
      point.add_tag('fluentd', tag) if @tag_fluentd
    end
    point.time(time_formatted, @precision)
    points << point
  end
  @write_api.write(data: points)
  log.debug "Written points: #{points}"
end