Class: LogStash::Filters::CSV

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/filters/csv.rb

Overview

The CSV filter takes an event field containing CSV data, parses it, and stores it as individual fields (can optionally specify the names). This filter can also parse data with any separator, not just commas.

Constant Summary collapse

CONVERTERS =
{
  :integer => lambda do |value|
    CSV::Converters[:integer].call(value)
  end,

  :float => lambda do |value|
    CSV::Converters[:float].call(value)
  end,

  :date => lambda do |value|
    result = CSV::Converters[:date].call(value)
    result.is_a?(Date) ? LogStash::Timestamp.new(result.to_time) : result
  end,

  :date_time => lambda do |value|
    result = CSV::Converters[:date_time].call(value)
    result.is_a?(DateTime) ? LogStash::Timestamp.new(result.to_time) : result
  end,

  :boolean => lambda do |value|
     value = value.strip.downcase
     return false if value == "false"
     return true  if value == "true"
     return value
  end
}

Instance Method Summary collapse

Instance Method Details

#filter(event) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/logstash/filters/csv.rb', line 127

def filter(event)
  @logger.debug? && @logger.debug("Running csv filter", :event => event)

  if (source = event.get(@source))
    begin

      values = CSV.parse_line(source, :col_sep => @separator, :quote_char => @quote_char)        

      if (@autodetect_column_names && @columns.empty?)
        @columns = values
        event.cancel
        return
      end

      if (@skip_header && (!@columns.empty?) && (@columns == values))
        event.cancel
        return
      end

      if(@skip_empty_rows && values.nil?)
        # applies tag to empty rows, users can cancel event referencing this tag in an 'if' conditional statement
        event.tag("_csvskippedemptyfield")
        return
      end

      values.each_index do |i|
        unless (@skip_empty_columns && (values[i].nil? || values[i].empty?))
          unless ignore_field?(i)
            field_name = @columns[i] || "column#{i + 1}"
            event.set(field_ref(field_name), transform(field_name, values[i]))
          end
        end
      end

      filter_matched(event)
    rescue => e
      event.tag("_csvparsefailure")
      @logger.warn("Error parsing csv", :field => @source, :source => source, :exception => e)
      return
    end
  end

  @logger.debug? && @logger.debug("Event after csv filter", :event => event)
end

#registerObject

Raises:

  • (LogStash::ConfigurationError)


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/logstash/filters/csv.rb', line 105

def register
  # validate conversion types to be the valid ones.
  bad_types = @convert.values.select do |type|
    !CONVERTERS.has_key?(type.to_sym)
  end.uniq

  raise(LogStash::ConfigurationError, "Invalid conversion types: #{bad_types.join(', ')}") unless bad_types.empty?

  # @convert_symbols contains the symbolized types to avoid symbol conversion in the transform method
  @convert_symbols = @convert.inject({}){|result, (k, v)| result[k] = v.to_sym; result}

  # make sure @target is in the format [field name] if defined, i.e. surrounded by brakets
  @target = "[#{@target}]" if @target && !@target.start_with?("[")
  
  # if the zero byte character is entered in the config, set the value
  if (@quote_char == "\\x00")
    @quote_char = "\x00"
  end
  
  @logger.debug? && @logger.debug("CSV parsing options", :col_sep => @separator, :quote_char => @quote_char)
end