Fluentd filter plugin to deduplicate records for InfluxDB

A filter plugin that implements the deduplication techniques described in the InfluxDB doc.

Installation

Using RubyGems:

fluent-gem install fluent-plugin-influxdb-deduplication

Configuration

Deduplicate by incrementing the timestamp

The filter plugin reads the fluentd record event time with a precision to the second, and stores it in the time_key field. Any following record with the same timestamp has a time_key incremented by 1 nanosecond.

<filter pattern>
  @type influxdb_deduplication

  # field to store the deduplicated timestamp
  time_key my_key_field
</filter>

For example, the following input records:

1613910640 { "k1" => 0, "k2" => "value0" }
1613910640 { "k1" => 1, "k2" => "value1" }
1613910640 { "k1" => 2, "k2" => "value2" }
1613910641 { "k1" => 3, "k3" => "value3" }

Would create on output:

1613910640 { "k1" => 0, "k2" => "value0", "my_key_field" => 1613910640000000000 }
1613910640 { "k1" => 1, "k2" => "value1", "my_key_field" => 1613910640000000001 }
1613910640 { "k1" => 2, "k2" => "value2", "my_key_field" => 1613910640000000002 }
1613910641 { "k1" => 3, "k3" => "value3", "my_key_field" => 1613910643000000000 }

The time key field can then be passed as is to the fluent-plugin-influxdb-v2. Example configuration on nginx logs:

<filter nginx.access>
  @type influxdb_deduplication

  # field to store the deduplicated timestamp
  time_key my_key_field
</filter>

<match nginx.access>
    @type influxdb2

    # setup the access to your InfluxDB v2 instance
    url             https://localhost:8086
    token           my-token
    bucket          my-bucket
    org             my-org

    # the influxdb2 timekey must be set to the same value as the influxdb_deduplication time_key
    time_key my_key_field

    # the timestamp precision must be set to ns
    time_precision ns

    tag_keys ["request_method", "status"]
    field_keys ["remote_addr", "request_uri"]
</match>

The data can then be queried as a table and viewed in Grafana for example with the flux query:

from(bucket: "my-bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> pivot(
    rowKey:["_time"],
    columnKey: ["_field"],
    valueColumn: "_value"
  )
  |> keep(columns: ["_time", "request_method", "status", "remote_addr", "request_uri"])

Deduplicate by adding a sequence tag

TODO