Class: LogStash::Codecs::Sflow
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::Sflow
- Defined in:
- lib/logstash/codecs/sflow.rb
Overview
The “sflow” codec is for decoding sflow v5 flows.
Instance Method Summary collapse
-
#assign_key_value(event, bindata_kv) ⇒ Object
def initialize.
- #common_sflow(event, decoded, sample, record) ⇒ Object
- #decode(payload) ⇒ Object
-
#initialize(params = {}) ⇒ Sflow
constructor
A new instance of Sflow.
- #register ⇒ Object
Constructor Details
#initialize(params = {}) ⇒ Sflow
Returns a new instance of Sflow.
16 17 18 19 20 21 |
# File 'lib/logstash/codecs/sflow.rb', line 16 def initialize(params = {}) super(params) @threadsafe = false # noinspection RubyResolve @removed_field = %w(records record_data record_length record_count record_entreprise record_format samples sample_data sample_entreprise sample_format sample_length sample_count sample_header layer3 layer4 layer4_data header udata) | @optional_removed_field end |
Instance Method Details
#assign_key_value(event, bindata_kv) ⇒ Object
def initialize
25 26 27 28 29 30 31 |
# File 'lib/logstash/codecs/sflow.rb', line 25 def assign_key_value(event, bindata_kv) bindata_kv.each_pair do |k, v| unless @removed_field.include? k.to_s event["#{k.to_s}"] = v.to_s end end end |
#common_sflow(event, decoded, sample, record) ⇒ Object
37 38 39 40 41 42 43 |
# File 'lib/logstash/codecs/sflow.rb', line 37 def common_sflow(event, decoded, sample, record) assign_key_value(event, decoded) assign_key_value(event, sample) assign_key_value(event, sample['sample_data']) assign_key_value(event, record) assign_key_value(event, record['record_data']) end |
#decode(payload) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/logstash/codecs/sflow.rb', line 53 def decode(payload) header = SFlowHeader.read(payload) unless @versions.include?(header.sflow_version) @logger.warn("Ignoring Sflow version v#{header.sflow_version}") return end decoded = SFlow.read(payload) events = [] decoded['samples'].each do |sample| #Treat case with no flow decoded (Unknown flow) if sample['sample_data'].to_s.eql? '' @logger.warn("Unknown sample entreprise #{sample['sample_entreprise'].to_s} - format #{sample['sample_format'].to_s}") next end #treat sample flow if sample['sample_entreprise'] == 0 && sample['sample_format'] == 1 # Create the logstash event event = LogStash::Event.new sample['sample_data']['records'].each do |record| # Ensure that some data exist for the record if record['record_data'].to_s.eql? '' @logger.warn("Unknown record entreprise #{record['record_entreprise'].to_s}, format #{record['record_format'].to_s}") next end common_sflow(event, decoded, sample, record) unless record['record_data']['sample_header'].to_s.eql? '' assign_key_value(event, record['record_data']['sample_header']) if record['record_data']['sample_header'].has_key?('layer3') assign_key_value(event, record['record_data']['sample_header']['layer3']['header']) assign_key_value(event, record['record_data']['sample_header']['layer3']['header']['layer4']) end end end #compute frame_length_times_sampling_rate if event.include?('frame_length') and event.include?('sampling_rate') event["frame_length_times_sampling_rate"] = event['frame_length'].to_i * event['sampling_rate'].to_i end events.push(event) #treat counter flow elsif sample['sample_entreprise'] == 0 && sample['sample_format'] == 2 sample['sample_data']['records'].each do |record| # Ensure that some data exist for the record if record['record_data'].to_s.eql? '' @logger.warn("Unknown record entreprise #{record['record_entreprise'].to_s}, format #{record['record_format'].to_s}") next end # Create the logstash event event = LogStash::Event.new common_sflow(event, decoded, sample, record) events.push(event) end end end events.each do |event| yield event end end |
#register ⇒ Object
46 47 48 |
# File 'lib/logstash/codecs/sflow.rb', line 46 def register require 'logstash/codecs/sflow/datagram' end |