Class: LogStash::Codecs::Netflow
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::Netflow
- Defined in:
- lib/logstash/codecs/netflow.rb
Overview
The “netflow” codec is used for decoding Netflow v5/v9/v10 (IPFIX) flows.
Supported Netflow/IPFIX exporters
The following Netflow/IPFIX exporters are known to work with the most recent version of the netflow codec:
- cols=“6,^2,^2,^2,12”,options=“header”
-
|=========================================================================================== |Netflow exporter | v5 | v9 | IPFIX | Remarks |Softflowd | y | y | y | IPFIX supported in github.com/djmdjm/softflowd |nProbe | y | y | y |
|ipt_NETFLOW | y | y | y | |Cisco ASA | | y | |
|Cisco IOS 12.x | | y | |
|fprobe | y | | | |Juniper MX80 | y | | | SW > 12.3R8 |OpenBSD pflow | y | n | y | man.openbsd.org/OpenBSD-current/man4/pflow.4 |Mikrotik 6.35.4 | y | | n | wiki.mikrotik.com/wiki/Manual:IP/Traffic_Flow |Ubiquiti Edgerouter X | | y | | With MPLS labels |Citrix Netscaler | | | y | Still some unknown fields, labeled netscalerUnknown<id> |===========================================================================================
Usage
Example Logstash configuration:
- source, ruby
input {
udp { host => localhost port => 2055 codec => netflow { versions => [5, 9] } type => netflow } udp { host => localhost port => 4739 codec => netflow { versions => [10] target => ipfix } type => ipfix } tcp { host => localhost port => 4739 codec => netflow { versions => [10] target => ipfix } type => ipfix }}
Constant Summary collapse
- NETFLOW5_FIELDS =
['version', 'flow_seq_num', 'engine_type', 'engine_id', 'sampling_algorithm', 'sampling_interval', 'flow_records']
- NETFLOW9_FIELDS =
['version', 'flow_seq_num']
- NETFLOW9_SCOPES =
{ 1 => :scope_system, 2 => :scope_interface, 3 => :scope_line_card, 4 => :scope_netflow_cache, 5 => :scope_template, }
- IPFIX_FIELDS =
['version']
- SWITCHED =
/_switched$/- FLOWSET_ID =
"flowset_id"
Instance Method Summary collapse
-
#decode(payload, metadata = nil, &block) ⇒ Object
def register.
-
#initialize(params = {}) ⇒ Netflow
constructor
A new instance of Netflow.
- #register ⇒ Object
Constructor Details
#initialize(params = {}) ⇒ Netflow
Returns a new instance of Netflow.
141 142 143 144 |
# File 'lib/logstash/codecs/netflow.rb', line 141 def initialize(params = {}) super(params) @threadsafe = false end |
Instance Method Details
#decode(payload, metadata = nil, &block) ⇒ Object
def register
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/logstash/codecs/netflow.rb', line 180 def decode(payload, = nil, &block) header = Header.read(payload) unless @versions.include?(header.version) @logger.warn("Ignoring Netflow version v#{header.version}") return end if header.version == 5 flowset = Netflow5PDU.read(payload) flowset.records.each do |record| yield(decode_netflow5(flowset, record)) end elsif header.version == 9 # BinData::trace_reading do flowset = Netflow9PDU.read(payload) flowset.records.each do |record| if != nil decode_netflow9(flowset, record, ).each{|event| yield(event)} else decode_netflow9(flowset, record).each{|event| yield(event)} end # end end elsif header.version == 10 flowset = IpfixPDU.read(payload) flowset.records.each do |record| decode_ipfix(flowset, record).each { |event| yield(event) } end else @logger.warn("Unsupported Netflow version v#{header.version}") end rescue BinData::ValidityError, IOError => e @logger.warn("Invalid netflow packet received (#{e})") end |
#register ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/logstash/codecs/netflow.rb', line 146 def register require "logstash/codecs/netflow/util" @netflow_templates = Vash.new() @ipfix_templates = Vash.new() # Path to default Netflow v9 field definitions filename = ::File.('netflow/netflow.yaml', ::File.dirname(__FILE__)) @netflow_fields = load_definitions(filename, @netflow_definitions) # Path to default IPFIX field definitions filename = ::File.('netflow/ipfix.yaml', ::File.dirname(__FILE__)) @ipfix_fields = load_definitions(filename, @ipfix_definitions) if @cache_save_path if @versions.include?(9) if File.exists?("#{@cache_save_path}/netflow_templates.cache") @netflow_templates_cache = load_templates_cache("#{@cache_save_path}/netflow_templates.cache") @netflow_templates_cache.each{ |key, fields| @netflow_templates[key, @cache_ttl] = BinData::Struct.new(:endian => :big, :fields => fields) } else @netflow_templates_cache = {} end end if @versions.include?(10) if File.exists?("#{@cache_save_path}/ipfix_templates.cache") @ipfix_templates_cache = load_templates_cache("#{@cache_save_path}/ipfix_templates.cache") @ipfix_templates_cache.each{ |key, fields| @ipfix_templates[key, @cache_ttl] = BinData::Struct.new(:endian => :big, :fields => fields) } else @ipfix_templates_cache = {} end end end end |