Class: Fluent::Plugin::ElasticAuditLogMetricOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_elastic_audit_log_metric.rb

Overview

output plugin

convert audit log to metric events

Constant Summary collapse

NAME =
'elastic_audit_log_metric'
ALLOWED_CATEGORIES =
%w[GRANTED_PRIVILEGES FAILED_LOGIN].freeze
DEFAULT_CATEGORIES =

FAILED_LOGIN AUTHENTICATED MISSING_PRIVILEGES SSL_EXCEPTION OPENDISTRO_SECURITY_INDEX_ATTEMPT BAD_HEADERS

%w[GRANTED_PRIVILEGES].freeze
CONFIGURATION_KEYS =
%w[category layer request_type cluster user indices r_indices timestamp privilege].freeze
DEFAULT_CATEGORY_KEY =
'audit_category'
DEFAULT_LAYER_KEY =
'audit_request_layer'
DEFAULT_REQUEST_TYPE =
'audit_transport_request_type'
DEFAULT_CLUSTER_KEY =
'audit_cluster_name'
DEFAULT_USER_KEY =
'audit_request_effective_user'
DEFAULT_INDICES_KEY =
'audit_trace_indices'
DEFAULT_R_INDICES_KEY =
'audit_trace_resolved_indices'
DEFAULT_REST_REQUEST_PATH =
'audit_rest_request_path'
DEFAULT_REQUEST_BODY =
'audit_request_body'
DEFAULT_TIMESTAMP_KEY =
'@timestamp'
DEFAULT_PRIVILEGE_KEY =
'audit_request_privilege'
DEFAULT_TRACE_TASK_ID_KEY =
'audit_trace_task_id'
DEFAULT_TRACE_TASK_PARENT_ID_KEY =
'audit_trace_task_parent_id'
DEFAULT_AGGREGATE_INDEX_CLEAN_SUFFIX =
[].freeze
DEFAULT_AGGREGATE_INTERVAL =
nil
DEFAULT_METADATA_PREFIX =
''

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#metric_processorObject (readonly)

Returns the value of attribute metric_processor.



107
108
109
# File 'lib/fluent/plugin/out_elastic_audit_log_metric.rb', line 107

def metric_processor
  @metric_processor
end

Instance Method Details

#check_configuration_keysObject

Raises:

  • (Fluent::ConfigError)


126
127
128
129
130
131
132
133
134
135
# File 'lib/fluent/plugin/out_elastic_audit_log_metric.rb', line 126

def check_configuration_keys
  keys = CONFIGURATION_KEYS
  invalid_keys = keys.each_with_object([]) do |key, invalid|
    key_label = "#{key}_key"
    key_value = send(key_label)
    invalid << key_label if !key_value || key_value.to_s.empty?
  end

  raise Fluent::ConfigError, "#{NAME}: #{invalid_keys} are empty" if invalid_keys.any?
end

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/fluent/plugin/out_elastic_audit_log_metric.rb', line 109

def configure(conf)
  super
  raise Fluent::ConfigError, "#{NAME}: tag is mandatory" if !tag || tag.to_s.empty?

  check_configuration_keys

  unsupported_categories = categories - ALLOWED_CATEGORIES
  unless unsupported_categories.empty?
    log.warn("#{NAME}: unsupported categories #{unsupported_categories}")
    @categories = categories - unsupported_categories
  end

  @metric_processor = ElasticLog::AuditLogToMetricProcessor.new(conf: self)

  true
end

#process(es_tag, es) ⇒ Object



137
138
139
140
141
142
143
144
145
# File 'lib/fluent/plugin/out_elastic_audit_log_metric.rb', line 137

def process(es_tag, es)
  time = Fluent::EventTime.now
  metrics = metric_processor.process(es_tag, es) || []
  metrics.each_slice(event_stream_size) do |metrics_slice|
    metrics_es = MultiEventStream.new
    metrics_slice.each { |record| metrics_es.add(time, record) }
    router.emit_stream(tag, metrics_es)
  end
end