Class: Fluent::Plugin::ElasticAuditLogMetricOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::ElasticAuditLogMetricOutput
- Defined in:
- lib/fluent/plugin/out_elastic_audit_log_metric.rb
Overview
output plugin
convert audit log to metric events
Constant Summary collapse
- NAME =
'elastic_audit_log_metric'- ALLOWED_CATEGORIES =
%w[GRANTED_PRIVILEGES FAILED_LOGIN].freeze
- DEFAULT_CATEGORIES =
FAILED_LOGIN AUTHENTICATED MISSING_PRIVILEGES SSL_EXCEPTION OPENDISTRO_SECURITY_INDEX_ATTEMPT BAD_HEADERS
%w[GRANTED_PRIVILEGES].freeze
- CONFIGURATION_KEYS =
%w[category layer request_type cluster user indices r_indices timestamp privilege].freeze
- DEFAULT_CATEGORY_KEY =
'audit_category'- DEFAULT_LAYER_KEY =
'audit_request_layer'- DEFAULT_REQUEST_TYPE =
'audit_transport_request_type'- DEFAULT_CLUSTER_KEY =
'audit_cluster_name'- DEFAULT_USER_KEY =
'audit_request_effective_user'- DEFAULT_INDICES_KEY =
'audit_trace_indices'- DEFAULT_R_INDICES_KEY =
'audit_trace_resolved_indices'- DEFAULT_REST_REQUEST_PATH =
'audit_rest_request_path'- DEFAULT_REQUEST_BODY =
'audit_request_body'- DEFAULT_TIMESTAMP_KEY =
'@timestamp'- DEFAULT_PRIVILEGE_KEY =
'audit_request_privilege'- DEFAULT_TRACE_TASK_ID_KEY =
'audit_trace_task_id'- DEFAULT_TRACE_TASK_PARENT_ID_KEY =
'audit_trace_task_parent_id'- DEFAULT_AGGREGATE_INDEX_CLEAN_SUFFIX =
[].freeze
- DEFAULT_AGGREGATE_INTERVAL =
nil- DEFAULT_METADATA_PREFIX =
''
Instance Attribute Summary collapse
-
#metric_processor ⇒ Object
readonly
Returns the value of attribute metric_processor.
Instance Method Summary collapse
Instance Attribute Details
#metric_processor ⇒ Object (readonly)
Returns the value of attribute metric_processor.
107 108 109 |
# File 'lib/fluent/plugin/out_elastic_audit_log_metric.rb', line 107 def metric_processor @metric_processor end |
Instance Method Details
#check_configuration_keys ⇒ Object
126 127 128 129 130 131 132 133 134 135 |
# File 'lib/fluent/plugin/out_elastic_audit_log_metric.rb', line 126 def check_configuration_keys keys = CONFIGURATION_KEYS invalid_keys = keys.each_with_object([]) do |key, invalid| key_label = "#{key}_key" key_value = send(key_label) invalid << key_label if !key_value || key_value.to_s.empty? end raise Fluent::ConfigError, "#{NAME}: #{invalid_keys} are empty" if invalid_keys.any? end |
#configure(conf) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/fluent/plugin/out_elastic_audit_log_metric.rb', line 109 def configure(conf) super raise Fluent::ConfigError, "#{NAME}: tag is mandatory" if !tag || tag.to_s.empty? check_configuration_keys unsupported_categories = categories - ALLOWED_CATEGORIES unless unsupported_categories.empty? log.warn("#{NAME}: unsupported categories #{unsupported_categories}") @categories = categories - unsupported_categories end @metric_processor = ElasticLog::AuditLogToMetricProcessor.new(conf: self) true end |
#process(es_tag, es) ⇒ Object
137 138 139 140 141 142 143 144 145 |
# File 'lib/fluent/plugin/out_elastic_audit_log_metric.rb', line 137 def process(es_tag, es) time = Fluent::EventTime.now metrics = metric_processor.process(es_tag, es) || [] metrics.each_slice(event_stream_size) do |metrics_slice| metrics_es = MultiEventStream.new metrics_slice.each { |record| metrics_es.add(time, record) } router.emit_stream(tag, metrics_es) end end |