Class: Fluent::Plugin::CloudwatchIngestParser

Inherits:
RegexpParser
  • Object
show all
Defined in:
lib/fluent/plugin/parser_cloudwatch_ingest.rb

Instance Method Summary collapse

Constructor Details

#initializeCloudwatchIngestParser

Returns a new instance of CloudwatchIngestParser.



18
19
20
# File 'lib/fluent/plugin/parser_cloudwatch_ingest.rb', line 18

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object



22
23
24
# File 'lib/fluent/plugin/parser_cloudwatch_ingest.rb', line 22

def configure(conf)
  super
end

#parse(event, group, stream) {|time, record| ... } ⇒ Object

Yields:

  • (time, record)


26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/parser_cloudwatch_ingest.rb', line 26

def parse(event, group, stream)
  time = nil
  record = nil
  super(event.message) do |t, r|
    time = t
    record = r
  end

  # Optionally attempt to parse the body as json
  if @parse_json_body
    begin
      # Whilst we could just merge! the parsed
      # message into the record we'd bork on
      # nested keys. Force level one Strings.
      json_body = MultiJson.load(record.message)
      json_body.each_pair do |k, v|
        record[k.to_s] = v.to_s
      end
    rescue MultiJson::ParseError
      if @fail_on_unparsable_json
        yield nil, nil
        return
      end
    end
  end

  # Inject optional fields
  record['log_group_name'] = group if @inject_group_name
  record['log_stream_name'] = stream if @inject_stream_name

  # We do String processing on the event time here to
  # avoid rounding errors introduced by floating point
  # arithmetic.
  event_s  = event.timestamp.to_s[0..9].to_i
  event_ns = event.timestamp.to_s[10..-1].to_i * 1_000_000

  time = Fluent::EventTime.new(event_s, event_ns) if @event_time

  yield time, record
end