Class: LogStash::Codecs::CEF

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::EventSupport::EventFactoryAdapter
Defined in:
lib/logstash/codecs/cef.rb

Overview

Implementation of a Logstash codec for the ArcSight Common Event Format (CEF) Based on Revision 20 of Implementing ArcSight CEF, dated from June 05, 2013 community.saas.hpe.com/dcvta86296/attachments/dcvta86296/connector-documentation/1116/1/CommonEventFormatv23.pdf

If this codec receives a payload from an input that is not a valid CEF message, then it will produce an event with the payload as the ‘message’ field and a ‘_cefparsefailure’ tag.

Defined Under Namespace

Classes: CEFField

Constant Summary collapse

InvalidTimestamp =
Class.new(StandardError)
HEADER_PATTERN =

A CEF Header is a sequence of zero or more:

- backslash-escaped pipes; OR
- backslash-escaped backslashes; OR
- non-pipe characters
/(?:\\\||\\\\|[^|])*?/
HEADER_NEXT_FIELD_PATTERN =

Cache of a scanner pattern that captures a HEADER followed by EOF or an unescaped pipe

/(#{HEADER_PATTERN})#{Regexp.quote('|')}/
HEADER_ESCAPE_CAPTURE =

Cache of a gsub pattern that matches a backslash-escaped backslash or backslash-escaped pipe, capturing the escaped character

/\\([\\|])/
EXTENSION_KEY_PATTERN =

While the original CEF spec calls out that extension keys must be alphanumeric and must not contain spaces, in practice many “CEF” producers like the Arcsight smart connector produce non-legal keys including underscores, commas, periods, and square-bracketed index offsets.

To support this, we look for a specific sequence of characters that are followed by an equals sign. This pattern will correctly identify all strictly-legal keys, and will also match those that include a dot-joined “subkeys” and square-bracketed array indexing

That sequence must begin with one or more ‘w` (word: alphanumeric + underscore), which optionally may be followed by one or more “subkey” sequences and an optional square-bracketed index.

To be understood by this implementation, a “subkey” sequence must consist of a literal dot (‘.`) followed by one or more characters that do not convey semantic meaning within CEF (e.g., literal-dot (`.`), literal-equals (`=`), whitespace (`s`), literal-pipe (`|`), literal-backslash (’'), or literal-square brackets (‘[` or `]`)).

/(?:\w+(?:\.[^\.=\s\|\\\[\]]+)*(?:\[[0-9]+\])?(?==))/
EXTENSION_KEY_ARRAY_CAPTURE =

Some CEF extension keys seen in the wild use an undocumented array-like syntax that may not be compatible with the Event API’s strict-mode FieldReference parser (e.g., ‘fieldname`). Cache of a `String#sub` pattern matching array-like syntax and capturing both the base field name and the array-indexing portion so we can convert to a valid FieldReference (e.g., `[fieldname]`).

/^([^\[\]]+)((?:\[[0-9]+\])+)$/
EXTENSION_VALUE_PATTERN =

In extensions, spaces may be included in an extension value without any escaping, so an extension value is a sequence of zero or more:

  • non-whitespace character; OR

  • runs of whitespace that are NOT followed by something that looks like a key-equals sequence

/(?:\S|\s++(?!#{EXTENSION_KEY_PATTERN}=))*/
EXTENSION_NEXT_KEY_VALUE_PATTERN =

Cache of a pattern that captures the NEXT extension field key/value pair

/^(#{EXTENSION_KEY_PATTERN})=(#{EXTENSION_VALUE_PATTERN})\s*/
CEF_PREFIX =
'CEF:'.freeze

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ CEF

Returns a new instance of CEF.



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/logstash/codecs/cef.rb', line 187

def initialize(params={})
  super(params)

  # CEF input MUST be UTF-8, per the CEF White Paper that serves as the format's specification:
  # https://web.archive.org/web/20160422182529/https://kc.mcafee.com/resources/sites/MCAFEE/content/live/CORP_KNOWLEDGEBASE/78000/KB78712/en_US/CEF_White_Paper_20100722.pdf
  @utf8_charset = LogStash::Util::Charset.new('UTF-8')
  @utf8_charset.logger = self.logger

  if @delimiter
    # Logstash configuration doesn't have built-in support for escaping,
    # so we implement it here. Feature discussion for escaping is here:
    #   https://github.com/elastic/logstash/issues/1645
    @delimiter = @delimiter.gsub("\\r", "\r").gsub("\\n", "\n")
    @buffer = FileWatch::BufferedTokenizer.new(@delimiter)
  end

  require_relative 'cef/timestamp_normalizer'
  @timestamp_normalizer = TimestampNormalizer.new(locale: @locale, timezone: @default_timezone)

  generate_header_fields!
  generate_mappings!
end

Instance Method Details

#decode(data, &block) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/logstash/codecs/cef.rb', line 211

def decode(data, &block)
  if @delimiter
    @logger.trace("Buffering #{data.bytesize}B of data") if @logger.trace?
    @buffer.extract(data).each do |line|
      @logger.trace("Decoding #{line.bytesize + @delimiter.bytesize}B of buffered data") if @logger.trace?
      handle(line, &block)
    end
  else
    @logger.trace("Decoding #{data.bytesize}B of unbuffered data") if @logger.trace?
    handle(data, &block)
  end
end

#encode(event) ⇒ Object



330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/logstash/codecs/cef.rb', line 330

def encode(event)
  # "CEF:0|Elasticsearch|Logstash|1.0|Signature|Name|Sev|"

  vendor = sanitize_header_field(event.sprintf(@vendor))
  vendor = self.class.get_config["vendor"][:default] if vendor.empty?

  product = sanitize_header_field(event.sprintf(@product))
  product = self.class.get_config["product"][:default] if product.empty?

  version = sanitize_header_field(event.sprintf(@version))
  version = self.class.get_config["version"][:default] if version.empty?

  signature = sanitize_header_field(event.sprintf(@signature))
  signature = self.class.get_config["signature"][:default] if signature.empty?

  name = sanitize_header_field(event.sprintf(@name))
  name = self.class.get_config["name"][:default] if name.empty?

  severity = sanitize_severity(event, @severity)

  # Should also probably set the fields sent
  header = ["CEF:0", vendor, product, version, signature, name, severity].join("|")
  values = @fields.map { |fieldname| get_value(fieldname, event) }.compact.join(" ")

  @on_event.call(event, "#{header}|#{values}#{@delimiter}")
end

#flush(&block) ⇒ Object



224
225
226
227
228
229
# File 'lib/logstash/codecs/cef.rb', line 224

def flush(&block)
  if @delimiter && (remainder = @buffer.flush)
    @logger.trace("Flushing #{remainder.bytesize}B of buffered data") if @logger.trace?
    handle(remainder, &block) unless remainder.empty?
  end
end

#handle(data, &block) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/logstash/codecs/cef.rb', line 231

def handle(data, &block)
  original_data = data.dup
  event = event_factory.new_event
  event.set(raw_data_field, data) unless raw_data_field.nil?

  @utf8_charset.convert(data)

  # Several of the many operations in the rest of this method will fail when they encounter UTF8-tagged strings
  # that contain invalid byte sequences; fail early to avoid wasted work.
  fail('invalid byte sequence in UTF-8') unless data.valid_encoding?

  # Strip any quotations at the start and end, flex connectors seem to send this
  if data[0] == "\""
    data = data[1..-2]
  end

  # Use a scanning parser to capture the HEADER_FIELDS
  unprocessed_data = data.chomp
  if unprocessed_data.include?(LITERAL_NEWLINE)
    fail("message is not valid CEF because it contains unescaped newline characters; " +
         "use the `delimiter` setting to enable in-codec buffering and delimiter-splitting")
  end
  @header_fields.each_with_index do |field_name, idx|
    match_data = HEADER_NEXT_FIELD_PATTERN.match(unprocessed_data)
    if match_data.nil?
      fail("message is not valid CEF; found #{idx} of 7 required pipe-terminated header fields")
    end

    escaped_field_value = match_data[1]
    next if escaped_field_value.nil?

    # process legal header escape sequences
    unescaped_field_value = escaped_field_value.gsub(HEADER_ESCAPE_CAPTURE, '\1')

    event.set(field_name, unescaped_field_value)
    unprocessed_data = match_data.post_match
  end

  #Remainder is message
  message = unprocessed_data

  # Try and parse out the syslog header if there is one
  cef_version_field = @header_fields[0]
  if (cef_version = event.get(cef_version_field)).include?(' ')
    split_cef_version = cef_version.rpartition(' ')
    event.set(@syslog_header, split_cef_version[0])
    event.set(cef_version_field, split_cef_version[2])
  end

  # Get rid of the CEF bit in the version
  event.set(cef_version_field, delete_cef_prefix(event.get(cef_version_field)))

  # Use a scanning parser to capture the Extension Key/Value Pairs
  if message && !message.empty?
    message = message.strip
    extension_fields = {}

    while (match = message.match(EXTENSION_NEXT_KEY_VALUE_PATTERN))
      extension_field_key, raw_extension_field_value = match.captures
      message = match.post_match

      # expand abbreviated extension field keys
      extension_field_key = @decode_mapping.fetch(extension_field_key, extension_field_key)

      # convert extension field name to strict legal field_reference, fixing field names with ambiguous array-like syntax
      extension_field_key = extension_field_key.sub(EXTENSION_KEY_ARRAY_CAPTURE, '[\1]\2') if extension_field_key.end_with?(']')

      # process legal extension field value escapes
      extension_field_value = desanitize_extension_val(raw_extension_field_value)

      extension_fields[extension_field_key] = extension_field_value
    end
    if !message.empty?
      fail("invalid extensions; keyless value present `#{message}`")
    end

    # in ECS mode, normalize timestamps including timezone.
    if ecs_compatibility != :disabled
      device_timezone = extension_fields['[event][timezone]']
      @timestamp_fields.each do |timestamp_field_name|
        raw_timestamp = extension_fields.delete(timestamp_field_name) or next
        value = normalize_timestamp(raw_timestamp, device_timezone)
        event.set(timestamp_field_name, value)
      end
    end

    extension_fields.each do |field_key, field_value|
      event.set(field_key, field_value)
    end
  end

  yield event
rescue => e
  @logger.error("Failed to decode CEF payload. Generating failure event with payload in message field.",
                (:original_data => original_data))
  yield event_factory.new_event("message" => data, "tags" => ["_cefparsefailure"])
end