Class: Fluent::ECSFilter
- Inherits:
-
Filter
- Object
- Filter
- Fluent::ECSFilter
- Defined in:
- lib/fluent/plugin/filter_ecs_filter.rb
Overview
Parses ECS data from docker to make fluentd logs more useful.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
Get the configuration for the plugin.
-
#filter_stream(tag, es) ⇒ Object
Gets the log event stream and moifies it.
-
#get_container_id_from_record(record) ⇒ Object
If the user has configured container_id_attr the container id can be gathered from the record if it has been inserted there.
-
#get_container_id_from_tag(tag) ⇒ Object
Gets the container id from the last element in the tag.
-
#get_container_metadata(id) ⇒ Object
Goes out to docker container to pull ecs data from labels.
-
#get_ecs_data(container_id) ⇒ Object
Gets the ecs data about a container from the cache or calls the Docker api to retrieve the data about the container and store it in the cache.
-
#merge_json_log(record) ⇒ Object
Look at the log value and if it is valid json then we will parse the json and merge it into the log record.
-
#modify_record(record, ecs_data) ⇒ Object
Injects the ecs data into the record and also merges the json log if that configuration is enabled.
Instance Method Details
#configure(conf) ⇒ Object
Get the configuration for the plugin
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 31 def configure(conf) super require 'docker-api' require 'lru_redux' require 'oj' require 'time' @cache_ttl = :none if @cache_ttl < 0 @cache = LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl) end |
#filter_stream(tag, es) ⇒ Object
Gets the log event stream and moifies it. This is where the plugin hooks into the fluentd envent stream.
46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 46 def filter_stream(tag, es) new_es = MultiEventStream.new container_id = get_container_id_from_tag(tag) es.each do |time, record| container_id = get_container_id_from_record(record) if container_id.empty? next unless container_id new_es.add(time, modify_record(record, get_ecs_data(container_id))) end new_es end |
#get_container_id_from_record(record) ⇒ Object
If the user has configured container_id_attr the container id can be gathered from the record if it has been inserted there. If no container_id can be found, the record is not processed.
Attributes::
-
record- The record that is being transformed by the filter
Returns:
-
A docker container id
127 128 129 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 127 def get_container_id_from_record(record) record[@container_id_attr] end |
#get_container_id_from_tag(tag) ⇒ Object
Gets the container id from the last element in the tag. If the user has configured container_id_attr the container id can be gathered from the record if it has been inserted there.
Attributes:
-
tag- The tag of the log being processed
Returns:
-
A docker container id
115 116 117 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 115 def get_container_id_from_tag(tag) tag.split('.').last end |
#get_container_metadata(id) ⇒ Object
Goes out to docker container to pull ecs data from labels.
Attributes:
-
id- The id of the container to look at for ecs metadata.
Returns:
-
A hash that describes a ecs task gathered from the Docker API
94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 94 def (id) task_data = {} container = Docker::Container.get(id) if container labels = container.json['Config']['Labels'] task_data['task_family'] = labels['com.amazonaws.ecs.task-definition-family'] task_data['task_family'].prepend(@task_family_prepend) if @task_family_prepend task_data['task_version'] = labels['com.amazonaws.ecs.task-definition-version'] task_data['task_id'] = labels['com.amazonaws.ecs.task-arn'].split('/').last end task_data end |
#get_ecs_data(container_id) ⇒ Object
Gets the ecs data about a container from the cache or calls the Docker api to retrieve the data about the container and store it in the cache.
Attributes:
-
container_id- The container_id where the log record originated from.
Returns:
-
A hash of data that describes a ecs task
82 83 84 85 86 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 82 def get_ecs_data(container_id) @cache.getset(container_id) do (container_id) end end |
#merge_json_log(record) ⇒ Object
Look at the log value and if it is valid json then we will parse the json and merge it into the log record. If a namespace is present then the log record is placed under that key.
Attributes:
-
record- The record we are transforming in the fluentd event stream.
Examples
# Docker captures stdout and passes it in the ‘log’ record attribute. # We try to discover is the value of ‘log’ is json, if it is then we # will parse the json and add the keys and values to the record.
Returns:
-
A record hash that has json log data merged into the record
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 142 def merge_json_log(record) if record.key?('log') log = record['log'].strip namespace = record['namespace'] if log[0].eql?('{') && log[-1].eql?('}') begin log_json = Oj.load(log) if namespace record[namespace] = log_json else record = log_json.merge(record) end rescue Oj::ParseError end end end record end |
#modify_record(record, ecs_data) ⇒ Object
Injects the ecs data into the record and also merges the json log if that configuration is enabled.
Attributes:
-
record- The log record being processed -
ecs_data- The ecs data retrived from the docker container
Returns:
-
A record hash that has ecs data and optinally log data added
69 70 71 72 73 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 69 def modify_record(record, ecs_data) modified_record = record.merge(ecs_data) modified_record = merge_json_log(modified_record) if @merge_json_log modified_record end |