Class: LogStash::Codecs::CEF
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::CEF
- Includes:
- PluginMixins::EventSupport::EventFactoryAdapter
- Defined in:
- lib/logstash/codecs/cef.rb
Overview
Implementation of a Logstash codec for the ArcSight Common Event Format (CEF) Based on Revision 20 of Implementing ArcSight CEF, dated from June 05, 2013 community.saas.hpe.com/dcvta86296/attachments/dcvta86296/connector-documentation/1116/1/CommonEventFormatv23.pdf
If this codec receives a payload from an input that is not a valid CEF message, then it will produce an event with the payload as the ‘message’ field and a ‘_cefparsefailure’ tag.
Defined Under Namespace
Classes: CEFField
Constant Summary collapse
- InvalidTimestamp =
Class.new(StandardError)
- HEADER_PATTERN =
A CEF Header is a sequence of zero or more:
- backslash-escaped pipes; OR - backslash-escaped backslashes; OR - non-pipe characters /(?:\\\||\\\\|[^|])*?/- HEADER_NEXT_FIELD_PATTERN =
Cache of a scanner pattern that captures a HEADER followed by EOF or an unescaped pipe
/(#{HEADER_PATTERN})#{Regexp.quote('|')}/- HEADER_ESCAPE_CAPTURE =
Cache of a gsub pattern that matches a backslash-escaped backslash or backslash-escaped pipe, capturing the escaped character
/\\([\\|])/- EXTENSION_KEY_PATTERN =
While the original CEF spec calls out that extension keys must be alphanumeric and must not contain spaces, in practice many “CEF” producers like the Arcsight smart connector produce non-legal keys including underscores, commas, periods, and square-bracketed index offsets.
To support this, we look for a specific sequence of characters that are followed by an equals sign. This pattern will correctly identify all strictly-legal keys, and will also match those that include a dot-joined “subkeys” and square-bracketed array indexing
That sequence must begin with one or more ‘w` (word: alphanumeric + underscore), which optionally may be followed by one or more “subkey” sequences and an optional square-bracketed index.
To be understood by this implementation, a “subkey” sequence must consist of a literal dot (‘.`) followed by one or more characters that do not convey semantic meaning within CEF (e.g., literal-dot (`.`), literal-equals (`=`), whitespace (`s`), literal-pipe (`|`), literal-backslash (’'), or literal-square brackets (‘[` or `]`)).
/(?:\w+(?:\.[^\.=\s\|\\\[\]]+)*(?:\[[0-9]+\])?(?==))/- EXTENSION_KEY_ARRAY_CAPTURE =
Some CEF extension keys seen in the wild use an undocumented array-like syntax that may not be compatible with the Event API’s strict-mode FieldReference parser (e.g., ‘fieldname`). Cache of a `String#sub` pattern matching array-like syntax and capturing both the base field name and the array-indexing portion so we can convert to a valid FieldReference (e.g., `[fieldname]`).
/^([^\[\]]+)((?:\[[0-9]+\])+)$/- EXTENSION_VALUE_PATTERN =
In extensions, spaces may be included in an extension value without any escaping, so an extension value is a sequence of zero or more:
-
non-whitespace character; OR
-
runs of whitespace that are NOT followed by something that looks like a key-equals sequence
-
/(?:\S|\s++(?!#{EXTENSION_KEY_PATTERN}=))*/- EXTENSION_NEXT_KEY_VALUE_PATTERN =
Cache of a pattern that captures the NEXT extension field key/value pair
/^(#{EXTENSION_KEY_PATTERN})=(#{EXTENSION_VALUE_PATTERN})\s*/- CEF_PREFIX =
'CEF:'.freeze
Instance Method Summary collapse
- #decode(data, &block) ⇒ Object
- #encode(event) ⇒ Object
- #flush(&block) ⇒ Object
- #handle(data, &block) ⇒ Object
-
#initialize(params = {}) ⇒ CEF
constructor
A new instance of CEF.
Constructor Details
#initialize(params = {}) ⇒ CEF
Returns a new instance of CEF.
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/logstash/codecs/cef.rb', line 187 def initialize(params={}) super(params) # CEF input MUST be UTF-8, per the CEF White Paper that serves as the format's specification: # https://web.archive.org/web/20160422182529/https://kc.mcafee.com/resources/sites/MCAFEE/content/live/CORP_KNOWLEDGEBASE/78000/KB78712/en_US/CEF_White_Paper_20100722.pdf @utf8_charset = LogStash::Util::Charset.new('UTF-8') @utf8_charset.logger = self.logger if @delimiter # Logstash configuration doesn't have built-in support for escaping, # so we implement it here. Feature discussion for escaping is here: # https://github.com/elastic/logstash/issues/1645 @delimiter = @delimiter.gsub("\\r", "\r").gsub("\\n", "\n") @buffer = FileWatch::BufferedTokenizer.new(@delimiter) end require_relative 'cef/timestamp_normalizer' @timestamp_normalizer = TimestampNormalizer.new(locale: @locale, timezone: @default_timezone) generate_header_fields! generate_mappings! end |
Instance Method Details
#decode(data, &block) ⇒ Object
211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/logstash/codecs/cef.rb', line 211 def decode(data, &block) if @delimiter @logger.trace("Buffering #{data.bytesize}B of data") if @logger.trace? @buffer.extract(data).each do |line| @logger.trace("Decoding #{line.bytesize + @delimiter.bytesize}B of buffered data") if @logger.trace? handle(line, &block) end else @logger.trace("Decoding #{data.bytesize}B of unbuffered data") if @logger.trace? handle(data, &block) end end |
#encode(event) ⇒ Object
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/logstash/codecs/cef.rb', line 330 def encode(event) # "CEF:0|Elasticsearch|Logstash|1.0|Signature|Name|Sev|" vendor = sanitize_header_field(event.sprintf(@vendor)) vendor = self.class.get_config["vendor"][:default] if vendor.empty? product = sanitize_header_field(event.sprintf(@product)) product = self.class.get_config["product"][:default] if product.empty? version = sanitize_header_field(event.sprintf(@version)) version = self.class.get_config["version"][:default] if version.empty? signature = sanitize_header_field(event.sprintf(@signature)) signature = self.class.get_config["signature"][:default] if signature.empty? name = sanitize_header_field(event.sprintf(@name)) name = self.class.get_config["name"][:default] if name.empty? severity = sanitize_severity(event, @severity) # Should also probably set the fields sent header = ["CEF:0", vendor, product, version, signature, name, severity].join("|") values = @fields.map { |fieldname| get_value(fieldname, event) }.compact.join(" ") @on_event.call(event, "#{header}|#{values}#{@delimiter}") end |
#flush(&block) ⇒ Object
224 225 226 227 228 229 |
# File 'lib/logstash/codecs/cef.rb', line 224 def flush(&block) if @delimiter && (remainder = @buffer.flush) @logger.trace("Flushing #{remainder.bytesize}B of buffered data") if @logger.trace? handle(remainder, &block) unless remainder.empty? end end |
#handle(data, &block) ⇒ Object
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/logstash/codecs/cef.rb', line 231 def handle(data, &block) original_data = data.dup event = event_factory.new_event event.set(raw_data_field, data) unless raw_data_field.nil? @utf8_charset.convert(data) # Several of the many operations in the rest of this method will fail when they encounter UTF8-tagged strings # that contain invalid byte sequences; fail early to avoid wasted work. fail('invalid byte sequence in UTF-8') unless data.valid_encoding? # Strip any quotations at the start and end, flex connectors seem to send this if data[0] == "\"" data = data[1..-2] end # Use a scanning parser to capture the HEADER_FIELDS unprocessed_data = data.chomp if unprocessed_data.include?(LITERAL_NEWLINE) fail("message is not valid CEF because it contains unescaped newline characters; " + "use the `delimiter` setting to enable in-codec buffering and delimiter-splitting") end @header_fields.each_with_index do |field_name, idx| match_data = HEADER_NEXT_FIELD_PATTERN.match(unprocessed_data) if match_data.nil? fail("message is not valid CEF; found #{idx} of 7 required pipe-terminated header fields") end escaped_field_value = match_data[1] next if escaped_field_value.nil? # process legal header escape sequences unescaped_field_value = escaped_field_value.gsub(HEADER_ESCAPE_CAPTURE, '\1') event.set(field_name, unescaped_field_value) unprocessed_data = match_data.post_match end #Remainder is message = unprocessed_data # Try and parse out the syslog header if there is one cef_version_field = @header_fields[0] if (cef_version = event.get(cef_version_field)).include?(' ') split_cef_version = cef_version.rpartition(' ') event.set(@syslog_header, split_cef_version[0]) event.set(cef_version_field, split_cef_version[2]) end # Get rid of the CEF bit in the version event.set(cef_version_field, delete_cef_prefix(event.get(cef_version_field))) # Use a scanning parser to capture the Extension Key/Value Pairs if && !.empty? = .strip extension_fields = {} while (match = .match(EXTENSION_NEXT_KEY_VALUE_PATTERN)) extension_field_key, raw_extension_field_value = match.captures = match.post_match # expand abbreviated extension field keys extension_field_key = @decode_mapping.fetch(extension_field_key, extension_field_key) # convert extension field name to strict legal field_reference, fixing field names with ambiguous array-like syntax extension_field_key = extension_field_key.sub(EXTENSION_KEY_ARRAY_CAPTURE, '[\1]\2') if extension_field_key.end_with?(']') # process legal extension field value escapes extension_field_value = desanitize_extension_val(raw_extension_field_value) extension_fields[extension_field_key] = extension_field_value end if !.empty? fail("invalid extensions; keyless value present `#{}`") end # in ECS mode, normalize timestamps including timezone. if ecs_compatibility != :disabled device_timezone = extension_fields['[event][timezone]'] @timestamp_fields.each do || = extension_fields.delete() or next value = (, device_timezone) event.set(, value) end end extension_fields.each do |field_key, field_value| event.set(field_key, field_value) end end yield event rescue => e @logger.error("Failed to decode CEF payload. Generating failure event with payload in message field.", (:original_data => original_data)) yield event_factory.new_event("message" => data, "tags" => ["_cefparsefailure"]) end |