Class: Fluent::Plugin::Parser

Inherits:
Base
  • Object
show all
Includes:
OwnedByMixin, TimeMixin::Parser
Defined in:
lib/fluent/plugin/parser.rb

Defined Under Namespace

Classes: ParserError

Constant Summary collapse

AVAILABLE_PARSER_VALUE_TYPES =
['string', 'integer', 'float', 'bool', 'time', 'array']
PARSER_TYPES =
[:text_per_line, :text, :binary]
TRUTHY_VALUES =
['true', 'yes', '1']

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods included from TimeMixin::Parser

included, #time_parser_create

Methods included from OwnedByMixin

#log, #owner, #owner=

Methods inherited from Base

#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #initialize, #inspect, #multi_workers_ready?, #plugin_root_dir, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type

Constructor Details

This class inherits a constructor from Fluent::Plugin::Base

Instance Attribute Details

#type_convertersObject (readonly)

for tests



54
55
56
# File 'lib/fluent/plugin/parser.rb', line 54

def type_converters
  @type_converters
end

Instance Method Details

#build_type_converters(types) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/fluent/plugin/parser.rb', line 145

def build_type_converters(types)
  return nil unless types

  converters = {}

  types.each_pair do |field_name, type_definition|
    type, option = type_definition.split(":", 2)
    unless AVAILABLE_PARSER_VALUE_TYPES.include?(type)
      raise Fluent::ConfigError, "unknown value conversion for key:'#{field_name}', type:'#{type}'"
    end

    conv = case type
           when 'string' then ->(v){ v.to_s }
           when 'integer' then ->(v){ v.to_i rescue v.to_s.to_i }
           when 'float' then ->(v){ v.to_f rescue v.to_s.to_f }
           when 'bool' then ->(v){ TRUTHY_VALUES.include?(v.to_s.downcase) }
           when 'time'
             # comma-separated: time:[timezone:]time_format
             # time_format is unixtime/float/string-time-format
             timep = if option
                       time_type = 'string' # estimate
                       timezone, time_format = option.split(':', 2)
                       unless Fluent::Timezone.validate(timezone)
                         timezone, time_format = nil, option
                       end
                       if Fluent::TimeMixin::TIME_TYPES.include?(time_format)
                         time_type, time_format = time_format, nil # unixtime/float
                       end
                       time_parser_create(type: time_type.to_sym, format: time_format, timezone: timezone)
                     else
                       time_parser_create(type: :string, format: nil, timezone: nil)
                     end
             ->(v){ timep.parse(v) rescue nil }
           when 'array'
             delimiter = option ? option.to_s : ','
             ->(v){ string_safe_encoding(v.to_s){|s| s.split(delimiter) } }
           else
             raise "BUG: unknown type even after check: #{type}"
           end
    converters[field_name] = conv
  end

  converters
end

#call(*a, &b) ⇒ Object



74
75
76
77
78
# File 'lib/fluent/plugin/parser.rb', line 74

def call(*a, &b)
  # Keep backward compatibility for existing plugins
  # TODO: warn when deprecated
  parse(*a, &b)
end

#configure(conf) ⇒ Object



61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/parser.rb', line 61

def configure(conf)
  super

  @time_parser = time_parser_create
  @null_value_regexp = @null_value_pattern && Regexp.new(@null_value_pattern)
  @type_converters = build_type_converters(@types)
  @execute_convert_values = @type_converters || @null_value_regexp || @null_empty_string
end

#convert_values(time, record) ⇒ Object

def parse(text, &block)

time, record = convert_values(time, record)
yield time, record

end



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/fluent/plugin/parser.rb', line 119

def convert_values(time, record)
  return time, record unless @execute_convert_values

  record.each_key do |key|
    value = record[key]
    next unless value # nil/null value is always left as-is.

    if value.is_a?(String) && string_like_null(value)
      record[key] = nil
      next
    end

    if @type_converters && @type_converters.has_key?(key)
      record[key] = @type_converters[key].call(value)
    end
  end

  return time, record
end

#implement?(feature) ⇒ Boolean

Returns:

  • (Boolean)


80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/parser.rb', line 80

def implement?(feature)
  methods_of_plugin = self.class.instance_methods(false)
  case feature
  when :parse_io then methods_of_plugin.include?(:parse_io)
  when :parse_partial_data then methods_of_plugin.include?(:parse_partial_data)
  else
    raise ArgumentError, "Unknown feature for parser plugin: #{feature}"
  end
end

#parse(text, &block) ⇒ Object

Raises:

  • (NotImplementedError)


70
71
72
# File 'lib/fluent/plugin/parser.rb', line 70

def parse(text, &block)
  raise NotImplementedError, "Implement this method in child class"
end

#parse_io(io, &block) ⇒ Object

Raises:

  • (NotImplementedError)


90
91
92
# File 'lib/fluent/plugin/parser.rb', line 90

def parse_io(io, &block)
  raise NotImplementedError, "Optional API #parse_io is not implemented"
end

#parse_partial_data(data, &block) ⇒ Object

Raises:

  • (NotImplementedError)


94
95
96
# File 'lib/fluent/plugin/parser.rb', line 94

def parse_partial_data(data, &block)
  raise NotImplementedError, "Optional API #parse_partial_data is not implemented"
end

#parse_time(record) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/parser.rb', line 98

def parse_time(record)
  if @time_key && record.respond_to?(:has_key?) && record.has_key?(@time_key)
    src = if @keep_time_key
            record[@time_key]
          else
            record.delete(@time_key)
          end
    @time_parser.parse(src)
  elsif @estimate_current_event
    Fluent::EventTime.now
  else
    nil
  end
rescue Fluent::TimeParser::TimeParseError => e
  raise ParserError, e.message
end

#parser_typeObject



57
58
59
# File 'lib/fluent/plugin/parser.rb', line 57

def parser_type
  :text_per_line
end

#string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_regexp) ⇒ Object



139
140
141
# File 'lib/fluent/plugin/parser.rb', line 139

def string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_regexp)
  null_empty_string && value.empty? || null_value_regexp && string_safe_encoding(value){|s| null_value_regexp.match(s) }
end