fluent-plugin-aggregate , a plugin for Fluentd

Build Status

A fluentd plugin to aggregate events by fields over time.

Installation

Add this line to your application's Gemfile:

gem 'fluent-plugin-aggregate'

And then execute:

$ bundle

Or install it yourself as:

$ gem install fluent-plugin-aggregate --no-document

Requirements

  • Ruby 2.1 or later
  • fluentd v0.12 or later
  • aggregate v0.0.1 or later

Usage

Filter plugin (@type 'aggregate')

Aggregate events grouping by fields over time.

<filter>
  @type aggregate
  intervals 5s
  keep_interval 1s
  group_fields field_group1,field_group2
  aggregate_fields numeric_field1, numeric_field2
  aggregations mean,median
</filter>

Common parameters

intervals

Intervals for the aggregatios, this plugin support multi interval aggregatios

intervals 5s,10s,20s

keep_interval

Additional time to wait fof arrive events (used when events has a delay in the origin)

keep_interval 5s

group_fields

Fields to group events (like group by in SQL)

group_fields tx,region

aggregate_fields

Fields to apply aggregation funtions (like mean, median, sum, etc), this plugin support multiple aggregations fields.

aggregate_fields response_time,pressure

aggregations

Aggregate funtions to apply, this plugin support multiple aggregations fields.

aggregations sum,min,max,mean,median,variance,standard_deviation

aggregate_event_tag

Default: aggregate

Tag prefix for events generated in the aggregation process. Full tag format is #aggregate_event_tag.#interval.

aggregate_event_tag aggregate

Example

Example with dummy input.

<system>
  workers 1
</system>
<source>
  @type dummy
  dummy {"tx":"test", "response_ms":500}
  tag test
  rate 1
</source>
<filter test>
  @type aggregate
  intervals 5s,10s
  keep_interval 1s
  group_fields tx
  aggregate_fields response_ms
  aggregator_suffix_name "aggregator#{worker_id}"
  aggregate_event_tag aggregate
</filter>
<match test>
  @type stdout
</match>
<match aggregate.**>
  @type stdout
</match>

Advanced parameters

processing_mode

Default: online

Mode for processing events (batch/online), for the online processing (useful for streaming processing), wait to arrive events (for example, interval of 5s, wait 5s + keep_interval to generate aggregate events), batch process don't wait.

processing_mode online

time_field

Default: timestamp

Field that conatins time for the event.

time_field timestamp

time_format

Default: %Y-%m-%dT%H:%M:%S.%L%:z

Time format for the time_field.

time_format %Y-%m-%dT%H:%M:%S.%L%:z

output_time_format

Default: %Y-%m-%dT%H:%M:%S.%L%:z

Time format for the generated aggregated event.

output_time_format %Y-%m-%dT%H:%M:%S.%L%:z

field_no_data_value

Default: no_data

The value for group fields in the aggregate event no present in the original event.

field_no_data_value no_data

emit_original_message

Default: true

The value for group fields in the aggregate event no present in the original event.

emit_original_message true

temporary_status_file_path

Default: nil

File to store aggregate information when the agent down.

temporary_status_file_path path_to_file.json

load_temporarystatus_file_enabled

Default: true

Load file #temporary_status_file_path on startup.

load_temporarystatus_file_enabled true