Class: Fluent::PcapngInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_pcapng.rb

Constant Summary collapse

LONG =
"long"
DOUBLE =
"double"
STRING =
"string"
TIME =
"time"

Instance Method Summary collapse

Constructor Details

#initializePcapngInput

Returns a new instance of PcapngInput.



33
34
35
# File 'lib/fluent/plugin/in_pcapng.rb', line 33

def initialize
  super
end

Instance Method Details

#build_extra_flags(extra_flags) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/fluent/plugin/in_pcapng.rb', line 104

def build_extra_flags(extra_flags)
  options = ""
  valid_flag_re = /(?:-[a-zA-Z]|--[a-z\-]+)/
  extra_flags.each do |i|
    if !i.match(/^#{valid_flag_re}/)
      raise ArgumentError, format("Invalid flags in extra_flags %s", i)
    end

    # escape given flags here because it is easier to understand, or write,
    # extra_flags in fluentd config.
    (k, v) = i.split(/\s+/, 2)
    options += "#{Shellwords.escape(k)} "
    options += "#{Shellwords.escape(v)} " if v
  end
  return options
end

#build_options(fields) ⇒ Object



96
97
98
99
100
101
102
# File 'lib/fluent/plugin/in_pcapng.rb', line 96

def build_options(fields)
  options = ""
  fields.each do |field|
    options += "-e #{Shellwords.escape(field)} "
  end
  return options
end

#collect_tshark_output(stdout) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/fluent/plugin/in_pcapng.rb', line 121

def collect_tshark_output(stdout)
  collected = []
  begin
    readlines_nonblock(stdout).each do |line|
      array = CSV.parse(line).flatten
      collected << array
    end
  rescue => e
    log.error "pcapng failed to read or parse line", :error => e.to_s,
      :error_class => e.class.to_s
  end

  collected.each do |ary|
    router.emit(@tag, Engine.now, generate_record(ary))
  end
rescue => e
  log.error "pcapng failed to collect output from tshark",
    :error => e.to_s,
    :error_class => e.class.to_s
end

#configure(conf) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/in_pcapng.rb', line 48

def configure(conf)
  super

  if @fields != nil and @fields == []
    raise ConfigError, "'fields' option is required on pcapng input"
  end

  if @types and @types.length != @fields.length
    raise ConfigError, "'types' length must be equal to 'fields' length"
  end
end

#convert_type(val, type) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/fluent/plugin/in_pcapng.rb', line 173

def convert_type val, type
  v = val.to_s.gsub("\"", "")
  v = "" if val == nil
  case type
  when LONG
    if v.is_a?(String) and v.match(/^0x[0-9a-fA-F]+$/)
      v = v.hex
    else
      v = v.to_i
    end
  when DOUBLE
    v = v.to_f
  when TIME
    v = Time.parse(v)
  end
  return v
end

#convert_types(array) ⇒ Object



167
168
169
170
171
# File 'lib/fluent/plugin/in_pcapng.rb', line 167

def convert_types(array)
  return [array, @types].transpose.map{|val, type|
    convert_type(val, type)
  }
end

#generate_record(array) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
# File 'lib/fluent/plugin/in_pcapng.rb', line 155

def generate_record(array)
  fields = @fields
  if fields.length != array.length
    return {}
  end
  carray = convert_types(array)
  if @convertdot
    fields = fields.map{|field| field.gsub(".", @convertdot)}
  end
  return Hash[[fields, carray].transpose]
end

#readlines_nonblock(io) ⇒ Object



142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/fluent/plugin/in_pcapng.rb', line 142

def readlines_nonblock(io)
  @nbbuffer = "" if @nbbuffer == nil
  @nbbuffer += io.read_nonblock(65535)
  lines = []
  while idx = @nbbuffer.index("\n")
    lines << @nbbuffer[0..idx-1]
    @nbbuffer = @nbbuffer[idx+1..-1]
  end
  return lines
rescue
  return []
end

#runObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/in_pcapng.rb', line 77

def run
  options = build_options(@fields)
  options += build_extra_flags(@extra_flags)
  cmdline = "tshark -i #{Shellwords.escape(@interface)} -T fields -E separator=\",\" -E quote=d #{options}"
  log.debug format("pcapng: %s", cmdline)
  _stdin, stdout, stderr, @th_tshark = *Open3.popen3(cmdline)

  while @th_tshark.alive?
    collect_tshark_output(stdout)
  end
  stderr.each do |l|
    log.error(l.chomp)
  end
  raise RuntimeError, "tshark is not running"
rescue => e
  log.error "unexpected error", :error => e.to_s
  log.error_backtrace e.backtrace
end

#shutdownObject



66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/in_pcapng.rb', line 66

def shutdown
  if @th_tshark and @th_tshark.alive?
    Process.kill("INT", @th_tshark.pid)
  end
  @thread.join
rescue => e
  log.error "pcapng failed to shutdown", :error => e.to_s,
    :error_class => e.class.to_s
  log.error_backtrace e.backtrace
end

#startObject



60
61
62
63
64
# File 'lib/fluent/plugin/in_pcapng.rb', line 60

def start
  super

  @thread = Thread.new(&method(:run))
end