Class: LogStash::Codecs::Netflow

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/netflow.rb

Overview

The “netflow” codec is used for decoding Netflow v5/v9/v10 (IPFIX) flows.

Supported Netflow/IPFIX exporters

The following Netflow/IPFIX exporters are known to work with the most recent version of the netflow codec:

cols=“6,^2,^2,^2,12”,options=“header”

|=========================================================================================== |Netflow exporter | v5 | v9 | IPFIX | Remarks |Softflowd | y | y | y | IPFIX supported in github.com/djmdjm/softflowd |nProbe | y | y | y |

|ipt_NETFLOW | y | y | y | |Cisco ASA | | y | |

|Cisco IOS 12.x | | y | |

|fprobe | y | | | |Juniper MX80 | y | | | SW > 12.3R8 |OpenBSD pflow | y | n | y | man.openbsd.org/OpenBSD-current/man4/pflow.4 |Mikrotik 6.35.4 | y | | n | wiki.mikrotik.com/wiki/Manual:IP/Traffic_Flow |Ubiquiti Edgerouter X | | y | | With MPLS labels |Citrix Netscaler | | | y | Still some unknown fields, labeled netscalerUnknown<id> |===========================================================================================

Usage

Example Logstash configuration:

source, ruby

input {

udp {
  host => localhost
  port => 2055
  codec => netflow {
    versions => [5, 9]
  }
  type => netflow
}
udp {
  host => localhost
  port => 4739
  codec => netflow {
    versions => [10]
    target => ipfix
 }
 type => ipfix
}
tcp {
  host => localhost
  port => 4739
  codec => netflow {
    versions => [10]
    target => ipfix
  }
  type => ipfix
}

}


Constant Summary collapse

NETFLOW5_FIELDS =
['version', 'flow_seq_num', 'engine_type', 'engine_id', 'sampling_algorithm', 'sampling_interval', 'flow_records']
NETFLOW9_FIELDS =
['version', 'flow_seq_num']
NETFLOW9_SCOPES =
{
  1 => :scope_system,
  2 => :scope_interface,
  3 => :scope_line_card,
  4 => :scope_netflow_cache,
  5 => :scope_template,
}
IPFIX_FIELDS =
['version']
SWITCHED =
/_switched$/
FLOWSET_ID =
"flowset_id"

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ Netflow

Returns a new instance of Netflow.



141
142
143
144
# File 'lib/logstash/codecs/netflow.rb', line 141

def initialize(params = {})
  super(params)
  @threadsafe = false
end

Instance Method Details

#decode(payload, metadata = nil, &block) ⇒ Object

def register



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/logstash/codecs/netflow.rb', line 180

def decode(payload,  = nil, &block)
  header = Header.read(payload)

  unless @versions.include?(header.version)
    @logger.warn("Ignoring Netflow version v#{header.version}")
    return
  end

  if header.version == 5
    flowset = Netflow5PDU.read(payload)
    flowset.records.each do |record|
      yield(decode_netflow5(flowset, record))
    end
  elsif header.version == 9
#     BinData::trace_reading do
    flowset = Netflow9PDU.read(payload)
    flowset.records.each do |record|
      if  != nil
        decode_netflow9(flowset, record, ).each{|event| yield(event)}
      else
        decode_netflow9(flowset, record).each{|event| yield(event)}
      end
#      end
   end
  elsif header.version == 10
    flowset = IpfixPDU.read(payload)
    flowset.records.each do |record|
      decode_ipfix(flowset, record).each { |event| yield(event) }
    end
  else
    @logger.warn("Unsupported Netflow version v#{header.version}")
  end
rescue BinData::ValidityError, IOError => e
  @logger.warn("Invalid netflow packet received (#{e})")
end

#registerObject



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/logstash/codecs/netflow.rb', line 146

def register
  require "logstash/codecs/netflow/util"
  @netflow_templates = Vash.new()
  @ipfix_templates = Vash.new()

  # Path to default Netflow v9 field definitions
  filename = ::File.expand_path('netflow/netflow.yaml', ::File.dirname(__FILE__))
  @netflow_fields = load_definitions(filename, @netflow_definitions)

  # Path to default IPFIX field definitions
  filename = ::File.expand_path('netflow/ipfix.yaml', ::File.dirname(__FILE__))
  @ipfix_fields = load_definitions(filename, @ipfix_definitions)

  if @cache_save_path
    if @versions.include?(9)
      if File.exists?("#{@cache_save_path}/netflow_templates.cache")
        @netflow_templates_cache = load_templates_cache("#{@cache_save_path}/netflow_templates.cache")
        @netflow_templates_cache.each{ |key, fields| @netflow_templates[key, @cache_ttl] = BinData::Struct.new(:endian => :big, :fields => fields) }
      else
        @netflow_templates_cache = {}
      end
    end

    if @versions.include?(10)
      if File.exists?("#{@cache_save_path}/ipfix_templates.cache")
        @ipfix_templates_cache = load_templates_cache("#{@cache_save_path}/ipfix_templates.cache")
        @ipfix_templates_cache.each{ |key, fields| @ipfix_templates[key, @cache_ttl] = BinData::Struct.new(:endian => :big, :fields => fields) }
      else
        @ipfix_templates_cache = {}
      end
    end
  end
end