Class: Fluent::Plugin::MesosphereFilter
- Inherits:
-
Filter
- Object
- Filter
- Fluent::Plugin::MesosphereFilter
- Defined in:
- lib/fluent/plugin/filter_mesosphere_filter.rb
Overview
Parses Marathon and Chronos data from docker to make fluentd logs more useful.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
Get the configuration for the plugin.
-
#filter_stream(tag, es) ⇒ Object
Gets the log event stream and moifies it.
-
#get_container_id_from_record(record) ⇒ Object
If the user has configured container_id_attr the container id can be gathered from the record if it has been inserted there.
-
#get_container_id_from_tag(tag) ⇒ Object
Gets the container id from the last element in the tag.
-
#get_container_metadata(id) ⇒ Object
Goes out to docker to get environment variables for a container.
-
#get_mesos_data(container_id) ⇒ Object
Gets the mesos data about a container from the cache or calls the Docker api to retrieve the data about the container and store it in the cache.
-
#merge_json_log(record) ⇒ Object
Look at the log value and if it is valid json then we will parse the json and merge it into the log record.
-
#modify_record(record, mesos_data) ⇒ Object
Injects the meso framework data into the record and also merges the json log if that configuration is enabled.
-
#parse_env(env) ⇒ Object
Split the env var on = and return the value ==== Attributes: *
env- The docker environment variable to parse to get the value.
Instance Method Details
#configure(conf) ⇒ Object
Get the configuration for the plugin
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/fluent/plugin/filter_mesosphere_filter.rb', line 45 def configure(conf) super @cache_ttl = :none if @cache_ttl < 0 @cache = LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl) @chronos_task_regex_compiled = Regexp.compile(@cronos_task_regex) @marathon_app_regex_compiled = Regexp.compile(@marathon_app_regex) end |
#filter_stream(tag, es) ⇒ Object
Gets the log event stream and moifies it. This is where the plugin hooks into the fluentd envent stream.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/fluent/plugin/filter_mesosphere_filter.rb', line 59 def filter_stream(tag, es) new_es = Fluent::MultiEventStream.new container_id = '' container_id = get_container_id_from_tag(tag) if get_container_id_tag es.each do |time, record| container_id = get_container_id_from_record(record) if container_id.empty? next unless container_id new_es.add(time, modify_record(record, get_mesos_data(container_id))) end new_es end |
#get_container_id_from_record(record) ⇒ Object
If the user has configured container_id_attr the container id can be gathered from the record if it has been inserted there. If no container_id can be found, the record is not processed.
Attributes::
-
record- The record that is being transformed by the filter
Returns:
-
A docker container id
157 158 159 |
# File 'lib/fluent/plugin/filter_mesosphere_filter.rb', line 157 def get_container_id_from_record(record) record[@container_id_attr] end |
#get_container_id_from_tag(tag) ⇒ Object
Gets the container id from the last element in the tag. If the user has configured container_id_attr the container id can be gathered from the record if it has been inserted there.
Attributes:
-
tag- The tag of the log being processed
Returns:
-
A docker container id
145 146 147 |
# File 'lib/fluent/plugin/filter_mesosphere_filter.rb', line 145 def get_container_id_from_tag(tag) tag.split('.').last end |
#get_container_metadata(id) ⇒ Object
Goes out to docker to get environment variables for a container. Then we parse the environment varibles looking for known Marathon and Chronos environment variables
Attributes:
-
id- The id of the container to look at for mesosphere metadata.
Returns:
-
A hash that describes a mesos task gathered from the Docker API
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/fluent/plugin/filter_mesosphere_filter.rb', line 110 def (id) task_data = {} container = Docker::Container.get(id) if container environment = container.json['Config']['Env'] environment.each do |env| # Chronos puts task_id in lowercase, and Marathon does it with # uppercase if env =~ /MESOS_TASK_ID/i task_data['mesos_task_id'] = parse_env(env) elsif env.include? 'MARATHON_APP_ID' match_data = parse_env(env).match(@marathon_app_regex_compiled) task_data['mesos_framework'] = 'marathon' task_data['app'] = match_data['app'] if match_data elsif env.include? 'CHRONOS_JOB_NAME' match_data = parse_env(env).match(@chronos_task_regex_compiled) task_data['mesos_framework'] = 'chronos' task_data['app'] = match_data['app'] if match_data task_data['chronos_task_type'] = match_data['task_type'] if match_data && match_data.names.include?('task_type') elsif @namespace_env_var && env.include?(@namespace_env_var) task_data['namespace'] = parse_env(env) end end end task_data end |
#get_mesos_data(container_id) ⇒ Object
Gets the mesos data about a container from the cache or calls the Docker api to retrieve the data about the container and store it in the cache.
Attributes:
-
container_id- The container_id where the log record originated from.
Returns:
-
A hash of data that describes a mesos task
96 97 98 99 100 |
# File 'lib/fluent/plugin/filter_mesosphere_filter.rb', line 96 def get_mesos_data(container_id) @cache.getset(container_id) do (container_id) end end |
#merge_json_log(record) ⇒ Object
Look at the log value and if it is valid json then we will parse the json and merge it into the log record. If a namespace is present then the log record is placed under that key.
Attributes:
-
record- The record we are transforming in the fluentd event stream.
Examples
# Docker captures stdout and passes it in the ‘log’ record attribute. # We try to discover is the value of ‘log’ is json, if it is then we # will parse the json and add the keys and values to the record.
Returns:
-
A record hash that has json log data merged into the record
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/fluent/plugin/filter_mesosphere_filter.rb', line 184 def merge_json_log(record) if record.key?('log') log = record['log'].strip namespace = record['namespace'] if log[0].eql?('{') && log[-1].eql?('}') begin log_json = Oj.load(log) if namespace record[namespace] = log_json else record = log_json.merge(record) end rescue Oj::ParseError end end end record end |
#modify_record(record, mesos_data) ⇒ Object
Injects the meso framework data into the record and also merges the json log if that configuration is enabled.
Attributes:
-
record- The log record being processed -
mesos_data- The mesos data retrived from the docker container
Returns:
-
A record hash that has mesos data and optinally log data added
83 84 85 86 87 |
# File 'lib/fluent/plugin/filter_mesosphere_filter.rb', line 83 def modify_record(record, mesos_data) modified_record = record.merge(mesos_data) modified_record = merge_json_log(modified_record) if @merge_json_log modified_record end |
#parse_env(env) ⇒ Object
Split the env var on = and return the value
Attributes:
-
env- The docker environment variable to parse to get the value.
Examples
# For the env value MARATHON_APP_ID the actual string value given to us # by docker is ‘MARATHON_APP_ID=some-app’. We want to return ‘some-app’.
Returns:
-
The value of an environment varaible
169 170 171 |
# File 'lib/fluent/plugin/filter_mesosphere_filter.rb', line 169 def parse_env(env) env.split('=').last end |