Class: Fluent::Plugin::Parser
- Includes:
- OwnedByMixin, TimeMixin::Parser
- Defined in:
- lib/fluent/plugin/parser.rb
Direct Known Subclasses
Compat::Parser, Apache2Parser, CSVParser, InHttpParser, JSONParser, LabeledTSVParser, MessagePackParser, MultilineParser, NoneParser, RegexpParser, SyslogParser, TSVParser
Defined Under Namespace
Classes: ParserError
Constant Summary collapse
- AVAILABLE_PARSER_VALUE_TYPES =
['string', 'integer', 'float', 'bool', 'time', 'array']
- PARSER_TYPES =
[:text_per_line, :text, :binary]
- TRUTHY_VALUES =
['true', 'yes', '1']
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#type_converters ⇒ Object
readonly
for tests.
Attributes inherited from Base
Instance Method Summary collapse
- #build_type_converters(types) ⇒ Object
- #call(*a, &b) ⇒ Object
- #configure(conf) ⇒ Object
-
#convert_values(time, record) ⇒ Object
def parse(text, &block) time, record = convert_values(time, record) yield time, record end.
- #implement?(feature) ⇒ Boolean
- #parse(text, &block) ⇒ Object
- #parse_io(io, &block) ⇒ Object
- #parse_partial_data(data, &block) ⇒ Object
- #parse_time(record) ⇒ Object
- #parser_type ⇒ Object
- #string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_regexp) ⇒ Object
Methods included from TimeMixin::Parser
Methods included from OwnedByMixin
Methods inherited from Base
#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #initialize, #inspect, #multi_workers_ready?, #plugin_root_dir, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Configurable
#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type
Constructor Details
This class inherits a constructor from Fluent::Plugin::Base
Instance Attribute Details
#type_converters ⇒ Object (readonly)
for tests
54 55 56 |
# File 'lib/fluent/plugin/parser.rb', line 54 def type_converters @type_converters end |
Instance Method Details
#build_type_converters(types) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/fluent/plugin/parser.rb', line 145 def build_type_converters(types) return nil unless types converters = {} types.each_pair do |field_name, type_definition| type, option = type_definition.split(":", 2) unless AVAILABLE_PARSER_VALUE_TYPES.include?(type) raise Fluent::ConfigError, "unknown value conversion for key:'#{field_name}', type:'#{type}'" end conv = case type when 'string' then ->(v){ v.to_s } when 'integer' then ->(v){ v.to_i rescue v.to_s.to_i } when 'float' then ->(v){ v.to_f rescue v.to_s.to_f } when 'bool' then ->(v){ TRUTHY_VALUES.include?(v.to_s.downcase) } when 'time' # comma-separated: time:[timezone:]time_format # time_format is unixtime/float/string-time-format timep = if option time_type = 'string' # estimate timezone, time_format = option.split(':', 2) unless Fluent::Timezone.validate(timezone) timezone, time_format = nil, option end if Fluent::TimeMixin::TIME_TYPES.include?(time_format) time_type, time_format = time_format, nil # unixtime/float end time_parser_create(type: time_type.to_sym, format: time_format, timezone: timezone) else time_parser_create(type: :string, format: nil, timezone: nil) end ->(v){ timep.parse(v) rescue nil } when 'array' delimiter = option ? option.to_s : ',' ->(v){ string_safe_encoding(v.to_s){|s| s.split(delimiter) } } else raise "BUG: unknown type even after check: #{type}" end converters[field_name] = conv end converters end |
#call(*a, &b) ⇒ Object
74 75 76 77 78 |
# File 'lib/fluent/plugin/parser.rb', line 74 def call(*a, &b) # Keep backward compatibility for existing plugins # TODO: warn when deprecated parse(*a, &b) end |
#configure(conf) ⇒ Object
61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/parser.rb', line 61 def configure(conf) super @time_parser = time_parser_create @null_value_regexp = @null_value_pattern && Regexp.new(@null_value_pattern) @type_converters = build_type_converters(@types) @execute_convert_values = @type_converters || @null_value_regexp || @null_empty_string end |
#convert_values(time, record) ⇒ Object
def parse(text, &block)
time, record = convert_values(time, record)
yield time, record
end
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/fluent/plugin/parser.rb', line 119 def convert_values(time, record) return time, record unless @execute_convert_values record.each_key do |key| value = record[key] next unless value # nil/null value is always left as-is. if value.is_a?(String) && string_like_null(value) record[key] = nil next end if @type_converters && @type_converters.has_key?(key) record[key] = @type_converters[key].call(value) end end return time, record end |
#implement?(feature) ⇒ Boolean
80 81 82 83 84 85 86 87 88 |
# File 'lib/fluent/plugin/parser.rb', line 80 def implement?(feature) methods_of_plugin = self.class.instance_methods(false) case feature when :parse_io then methods_of_plugin.include?(:parse_io) when :parse_partial_data then methods_of_plugin.include?(:parse_partial_data) else raise ArgumentError, "Unknown feature for parser plugin: #{feature}" end end |
#parse(text, &block) ⇒ Object
70 71 72 |
# File 'lib/fluent/plugin/parser.rb', line 70 def parse(text, &block) raise NotImplementedError, "Implement this method in child class" end |
#parse_io(io, &block) ⇒ Object
90 91 92 |
# File 'lib/fluent/plugin/parser.rb', line 90 def parse_io(io, &block) raise NotImplementedError, "Optional API #parse_io is not implemented" end |
#parse_partial_data(data, &block) ⇒ Object
94 95 96 |
# File 'lib/fluent/plugin/parser.rb', line 94 def parse_partial_data(data, &block) raise NotImplementedError, "Optional API #parse_partial_data is not implemented" end |
#parse_time(record) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/fluent/plugin/parser.rb', line 98 def parse_time(record) if @time_key && record.respond_to?(:has_key?) && record.has_key?(@time_key) src = if @keep_time_key record[@time_key] else record.delete(@time_key) end @time_parser.parse(src) elsif @estimate_current_event Fluent::EventTime.now else nil end rescue Fluent::TimeParser::TimeParseError => e raise ParserError, e. end |
#parser_type ⇒ Object
57 58 59 |
# File 'lib/fluent/plugin/parser.rb', line 57 def parser_type :text_per_line end |
#string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_regexp) ⇒ Object
139 140 141 |
# File 'lib/fluent/plugin/parser.rb', line 139 def string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_regexp) null_empty_string && value.empty? || null_value_regexp && string_safe_encoding(value){|s| null_value_regexp.match(s) } end |