Class: Fluent::PcapngInput
- Inherits:
-
Input
- Object
- Input
- Fluent::PcapngInput
- Defined in:
- lib/fluent/plugin/in_pcapng.rb
Constant Summary collapse
- LONG =
"long"- DOUBLE =
"double"- STRING =
"string"- TIME =
"time"
Instance Method Summary collapse
- #build_extra_flags(extra_flags) ⇒ Object
- #build_options(fields) ⇒ Object
- #collect_tshark_output(stdout) ⇒ Object
- #configure(conf) ⇒ Object
- #convert_type(val, type) ⇒ Object
- #convert_types(array) ⇒ Object
- #generate_record(array) ⇒ Object
-
#initialize ⇒ PcapngInput
constructor
A new instance of PcapngInput.
- #readlines_nonblock(io) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ PcapngInput
Returns a new instance of PcapngInput.
33 34 35 |
# File 'lib/fluent/plugin/in_pcapng.rb', line 33 def initialize super end |
Instance Method Details
#build_extra_flags(extra_flags) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/fluent/plugin/in_pcapng.rb', line 104 def build_extra_flags(extra_flags) = "" valid_flag_re = /(?:-[a-zA-Z]|--[a-z\-]+)/ extra_flags.each do |i| if !i.match(/^#{valid_flag_re}/) raise ArgumentError, format("Invalid flags in extra_flags %s", i) end # escape given flags here because it is easier to understand, or write, # extra_flags in fluentd config. (k, v) = i.split(/\s+/, 2) += "#{Shellwords.escape(k)} " += "#{Shellwords.escape(v)} " if v end return end |
#build_options(fields) ⇒ Object
96 97 98 99 100 101 102 |
# File 'lib/fluent/plugin/in_pcapng.rb', line 96 def (fields) = "" fields.each do |field| += "-e #{Shellwords.escape(field)} " end return end |
#collect_tshark_output(stdout) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/fluent/plugin/in_pcapng.rb', line 121 def collect_tshark_output(stdout) collected = [] begin readlines_nonblock(stdout).each do |line| array = CSV.parse(line).flatten collected << array end rescue => e log.error "pcapng failed to read or parse line", :error => e.to_s, :error_class => e.class.to_s end collected.each do |ary| router.emit(@tag, Engine.now, generate_record(ary)) end rescue => e log.error "pcapng failed to collect output from tshark", :error => e.to_s, :error_class => e.class.to_s end |
#configure(conf) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/fluent/plugin/in_pcapng.rb', line 48 def configure(conf) super if @fields != nil and @fields == [] raise ConfigError, "'fields' option is required on pcapng input" end if @types and @types.length != @fields.length raise ConfigError, "'types' length must be equal to 'fields' length" end end |
#convert_type(val, type) ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/fluent/plugin/in_pcapng.rb', line 173 def convert_type val, type v = val.to_s.gsub("\"", "") v = "" if val == nil case type when LONG if v.is_a?(String) and v.match(/^0x[0-9a-fA-F]+$/) v = v.hex else v = v.to_i end when DOUBLE v = v.to_f when TIME v = Time.parse(v) end return v end |
#convert_types(array) ⇒ Object
167 168 169 170 171 |
# File 'lib/fluent/plugin/in_pcapng.rb', line 167 def convert_types(array) return [array, @types].transpose.map{|val, type| convert_type(val, type) } end |
#generate_record(array) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/fluent/plugin/in_pcapng.rb', line 155 def generate_record(array) fields = @fields if fields.length != array.length return {} end carray = convert_types(array) if @convertdot fields = fields.map{|field| field.gsub(".", @convertdot)} end return Hash[[fields, carray].transpose] end |
#readlines_nonblock(io) ⇒ Object
142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/fluent/plugin/in_pcapng.rb', line 142 def readlines_nonblock(io) @nbbuffer = "" if @nbbuffer == nil @nbbuffer += io.read_nonblock(65535) lines = [] while idx = @nbbuffer.index("\n") lines << @nbbuffer[0..idx-1] @nbbuffer = @nbbuffer[idx+1..-1] end return lines rescue return [] end |
#run ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/fluent/plugin/in_pcapng.rb', line 77 def run = (@fields) += build_extra_flags(@extra_flags) cmdline = "tshark -i #{Shellwords.escape(@interface)} -T fields -E separator=\",\" -E quote=d #{options}" log.debug format("pcapng: %s", cmdline) _stdin, stdout, stderr, @th_tshark = *Open3.popen3(cmdline) while @th_tshark.alive? collect_tshark_output(stdout) end stderr.each do |l| log.error(l.chomp) end raise RuntimeError, "tshark is not running" rescue => e log.error "unexpected error", :error => e.to_s log.error_backtrace e.backtrace end |
#shutdown ⇒ Object
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin/in_pcapng.rb', line 66 def shutdown if @th_tshark and @th_tshark.alive? Process.kill("INT", @th_tshark.pid) end @thread.join rescue => e log.error "pcapng failed to shutdown", :error => e.to_s, :error_class => e.class.to_s log.error_backtrace e.backtrace end |
#start ⇒ Object
60 61 62 63 64 |
# File 'lib/fluent/plugin/in_pcapng.rb', line 60 def start super @thread = Thread.new(&method(:run)) end |