Class: Fluent::Plugin::MutateFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_mutate.rb

Constant Summary collapse

VALID_CONVERSIONS =

Convert valid types

%w(string integer float boolean datetime)
VALID_PARSERS =

Parser valid types

%w(json)
CONVERT_PREFIX =

Convert helper method prefix

"convert_".freeze
TRUE_REGEX =

Convert boolean regex

(/^(true|t|yes|y|1)$/i).freeze
FALSE_REGEX =
(/^(false|f|no|n|0)$/i).freeze
ENVIRONMENT_TAG_REGEXP =

Placeholder regex

/%e\{[^}]+\}/
TEMPLATE_TAG_REGEXP =

Placeholder regex

/%\{[^}]+\}/

Instance Method Summary collapse

Constructor Details

#initializeMutateFilter

Returns a new instance of MutateFilter.



21
22
23
24
25
# File 'lib/fluent/plugin/filter_mutate.rb', line 21

def initialize
  super

  @actions = []
end

Instance Method Details

#configure(conf) ⇒ NilClass

Initialize attributes and parameters

Returns:

  • (NilClass)

Since:

  • 0.1.0



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/filter_mutate.rb', line 30

def configure(conf)
  super

  @mutators.each do |mutator|
    section = mutator.corresponding_config_element

    type = section["@type"]
    data = section.to_h.tap { |h| h.delete("@type") }

    unless type
      raise Fluent::ConfigError, "Missing '@type' parameter in <mutator>"
    end

    unless self.respond_to?(type.to_sym, :include_private)
      raise Fluent::ConfigError, "Invalid mutator #{type}"
    end

    # Iterate over section keys to remove access warnings, we'll be
    # iterating over the data which has been dumped to array later
    data.keys.each do |key|
      section.has_key?(key)
    end

    # Validate config section types
    case type
      when "convert"
        data.each do |key, value|
          unless VALID_CONVERSIONS.include?(value)
            raise Fluent::ConfigError, "mutate #{type} action " +
                  "received an invalid type for #{key}, should be one " +
                  "of #{VALID_CONVERSIONS.join(', ')}."
          end
        end
      when "parse"
        data.each do |key, value|
          if key == "@merge_root"
            data[key] = Fluent::Config.bool_value(value)
            next
          end

          unless VALID_PARSERS.include?(value)
            raise Fluent::ConfigError, "mutate #{type} action " +
                  "received an invalid type for #{key}, should be one " +
                  "of #{VALID_PARSERS.join(', ')}."
          end
        end
      when "lowercase", "uppercase", "remove", "strip"
        data.each do |key, value|
          v = Fluent::Config.bool_value(value)
          if v.nil?
            raise Fluent::ConfigError, "mutate #{type} action " +
                  "requires boolean values, received '#{value}' " +
                  "for #{key}."
          end
          data[key] = v
        end
      when "gsub"
        data.each do |key, value|
          v = Fluent::Config::ARRAY_TYPE.call(value, {value_type: :string})
          if not v.is_a?(Array) or not v.length == 2
              raise Fluent::ConfigError, "mutate #{type} action " +
                    "requires array values, representing " +
                    "[pattern, replacement] for #{key}, " +
                    "received '#{value}'"
          end

          pattern = v[0]
          replacement = v[1]

          data[key] = {
            pattern: (
              pattern.index("%{").nil?? Regexp.new(pattern): pattern\
            ),
            replacement: replacement
          }
        end
    end

    @actions << {
      "@type": type,
      "data": data
    }
  end
end

#filter(tag, time, record) ⇒ Hash

Filter action which will manipulate records

Returns:

  • (Hash)

    the modified event record

Since:

  • 0.1.0



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/fluent/plugin/filter_mutate.rb', line 137

def filter(tag, time, record)
  # In order to more easily navigate the record, we wrap the record in a
  # delegator. We additionally pass the `expand_nesting` option which
  # determines whether we should treat periods as field separators.
  result = Fluent::Plugin::Mixin::MutateEvent.
    new(record, expand_nesting: @expand_nesting)
  result.event_time = time.to_i
  result.event_tag = tag

  @actions.each do |action|
    type = action[:@type]
    data = action[:data]

    begin
      send(type.to_sym, data, result)
    rescue => e
      log.warn "failed to mutate #{action} action", error: e
      log.warn_backtrace
    end
  end

  result.prune if @prune_empty
  result.to_record
end