Class: Fluent::ECSFilter
- Inherits:
-
Filter
- Object
- Filter
- Fluent::ECSFilter
- Defined in:
- lib/fluent/plugin/filter_ecs_filter.rb
Overview
Parses ECS data from docker to make fluentd logs more useful.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
Get the configuration for the plugin.
-
#filter_stream(tag, es) ⇒ Object
Gets the log event stream and moifies it.
-
#get_container_id_from_record(record) ⇒ Object
If the user has configured container_id_attr the container id can be gathered from the record if it has been inserted there.
-
#get_container_id_from_tag(tag) ⇒ Object
Gets the container id from the last element in the tag.
-
#get_container_metadata(id) ⇒ Object
Goes out to docker container to pull ecs data from labels.
-
#get_ecs_data(container_id) ⇒ Object
Gets the ecs data about a container from the cache or calls the Docker api to retrieve the data about the container and store it in the cache.
-
#merge_json_log(record) ⇒ Object
Look at the log value and if it is valid json then we will parse the json and merge it into the log record.
-
#modify_record(record, ecs_data) ⇒ Object
Injects the ecs data into the record and also merges the json log if that configuration is enabled.
Instance Method Details
#configure(conf) ⇒ Object
Get the configuration for the plugin
31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 31 def configure(conf) super require 'docker-api' require 'lru_redux' require 'oj' require 'time' require 'vine' @cache_ttl = :none if @cache_ttl < 0 @cache = LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl) end |
#filter_stream(tag, es) ⇒ Object
Gets the log event stream and moifies it. This is where the plugin hooks into the fluentd envent stream.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 47 def filter_stream(tag, es) new_es = MultiEventStream.new container_id_from_tag = nil if container_id_attr.nil? container_id_from_tag = get_container_id_from_tag(tag) end es.each do |time, record| #puts "OMG #{container_id_from_tag}" if container_id_from_tag.nil? puts "GETTING FROM RECORD" container_id = get_container_id_from_record(record) puts "GOT CONTAINER ID #{container_id} from record" else container_id = container_id_from_tag end next unless container_id new_es.add(time, modify_record(record, get_ecs_data(container_id))) end new_es end |
#get_container_id_from_record(record) ⇒ Object
If the user has configured container_id_attr the container id can be gathered from the record if it has been inserted there. If no container_id can be found, the record is not processed.
Attributes::
-
record- The record that is being transformed by the filter
Returns:
-
A docker container id
137 138 139 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 137 def get_container_id_from_record(record) record.access(@container_id_attr) end |
#get_container_id_from_tag(tag) ⇒ Object
Gets the container id from the last element in the tag. If the user has configured container_id_attr the container id can be gathered from the record if it has been inserted there.
Attributes:
-
tag- The tag of the log being processed
Returns:
-
A docker container id
125 126 127 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 125 def get_container_id_from_tag(tag) tag.split('.').last end |
#get_container_metadata(id) ⇒ Object
Goes out to docker container to pull ecs data from labels.
Attributes:
-
id- The id of the container to look at for ecs metadata.
Returns:
-
A hash that describes a ecs task gathered from the Docker API
104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 104 def (id) task_data = {} container = Docker::Container.get(id) if container labels = container.json['Config']['Labels'] task_data['task_family'] = labels['com.amazonaws.ecs.task-definition-family'] task_data['task_family'].prepend(@task_family_prepend) if @task_family_prepend task_data['task_version'] = labels['com.amazonaws.ecs.task-definition-version'] task_data['task_id'] = labels['com.amazonaws.ecs.task-arn'].split('/').last end task_data end |
#get_ecs_data(container_id) ⇒ Object
Gets the ecs data about a container from the cache or calls the Docker api to retrieve the data about the container and store it in the cache.
Attributes:
-
container_id- The container_id where the log record originated from.
Returns:
-
A hash of data that describes a ecs task
92 93 94 95 96 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 92 def get_ecs_data(container_id) @cache.getset(container_id) do (container_id) end end |
#merge_json_log(record) ⇒ Object
Look at the log value and if it is valid json then we will parse the json and merge it into the log record. If a namespace is present then the log record is placed under that key.
Attributes:
-
record- The record we are transforming in the fluentd event stream.
Examples
# Docker captures stdout and passes it in the ‘log’ record attribute. # We try to discover is the value of ‘log’ is json, if it is then we # will parse the json and add the keys and values to the record.
Returns:
-
A record hash that has json log data merged into the record
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 152 def merge_json_log(record) if record.key?('log') log = record['log'].strip namespace = record['namespace'] if log[0].eql?('{') && log[-1].eql?('}') begin log_json = Oj.load(log) if namespace record[namespace] = log_json else record = log_json.merge(record) end rescue Oj::ParseError end end end record end |
#modify_record(record, ecs_data) ⇒ Object
Injects the ecs data into the record and also merges the json log if that configuration is enabled.
Attributes:
-
record- The log record being processed -
ecs_data- The ecs data retrived from the docker container
Returns:
-
A record hash that has ecs data and optinally log data added
79 80 81 82 83 |
# File 'lib/fluent/plugin/filter_ecs_filter.rb', line 79 def modify_record(record, ecs_data) modified_record = record.merge(ecs_data) modified_record = merge_json_log(modified_record) if @merge_json_log modified_record end |