Class: LogStash::Codecs::CSV

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/csv.rb

Constant Summary collapse

CONVERTERS =
{
  :integer => lambda do |value|
    CSV::Converters[:integer].call(value)
  end,

  :float => lambda do |value|
    CSV::Converters[:float].call(value)
  end,

  :date => lambda do |value|
    result = CSV::Converters[:date].call(value)
    result.is_a?(Date) ? LogStash::Timestamp.new(result.to_time) : result
  end,

  :date_time => lambda do |value|
    result = CSV::Converters[:date_time].call(value)
    result.is_a?(DateTime) ? LogStash::Timestamp.new(result.to_time) : result
  end,

  :boolean => lambda do |value|
     value = value.strip.downcase
     return false if value == "false"
     return true  if value == "true"
     return value
  end
}

Instance Method Summary collapse

Instance Method Details

#decode(data) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/logstash/codecs/csv.rb', line 111

def decode(data)
  data = @converter.convert(data)
  begin
    values = CSV.parse_line(data, :col_sep => @separator, :quote_char => @quote_char)

    if (@autodetect_column_names && @columns.empty?)
      @columns = values
      @logger.debug? && @logger.debug("Auto detected the following columns", :columns => @columns.inspect)
      return
    end

    decoded = {}
    values.each_index do |i|
      unless (@skip_empty_columns && (values[i].nil? || values[i].empty?))
        unless ignore_field?(i)
          field_name = @columns[i] || "column#{i + 1}"
          decoded[field_name] = transform(field_name, values[i])
        end
      end
    end

    yield LogStash::Event.new(decoded)
  rescue CSV::MalformedCSVError => e
    @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => data)
    yield LogStash::Event.new("message" => data, "tags" => ["_csvparsefailure"])
  end
end

#encode(event) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/logstash/codecs/csv.rb', line 139

def encode(event)
  if @include_headers
    csv_data = CSV.generate_line(select_keys(event), :col_sep => @separator, :quote_char => @quote_char, :headers => true)
    @on_event.call(event, csv_data)

    # output headers only once per codec lifecycle
    @include_headers = false
  end

  csv_data = CSV.generate_line(select_values(event), :col_sep => @separator, :quote_char => @quote_char)
  @on_event.call(event, csv_data)
end

#registerObject

Raises:

  • (LogStash::ConfigurationError)


90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/logstash/codecs/csv.rb', line 90

def register
  @converter = LogStash::Util::Charset.new(@charset)
  @converter.logger = @logger

  # validate conversion types to be the valid ones.
  bad_types = @convert.values.select do |type|
    !CONVERTERS.has_key?(type.to_sym)
  end.uniq
  raise(LogStash::ConfigurationError, "Invalid conversion types: #{bad_types.join(', ')}") unless bad_types.empty?

  # @convert_symbols contains the symbolized types to avoid symbol conversion in the transform method
  @convert_symbols = @convert.each_with_object({}){|(k, v), result| result[k] = v.to_sym}

  # if the zero byte character is entered in the config, set the value
  if (@quote_char == "\\x00")
    @quote_char = "\x00"
  end

  @logger.debug? && @logger.debug("CSV parsing options", :col_sep => @separator, :quote_char => @quote_char)
end