Class: Fluent::Plugin::NetflowipfixInput

Inherits:
Input
  • Object
show all
Includes:
DetachMultiProcessMixin
Defined in:
lib/fluent/plugin/in_netflowipfix.rb,
lib/fluent/plugin/parser_netflow_v5.rb,
lib/fluent/plugin/parser_netflow_v9.rb,
lib/fluent/plugin/netflowipfix_records.rb

Defined Under Namespace

Classes: Header, IP4Addr, IP6Addr, MacAddr, MplsLabel, Netflow10Packet, Netflow5Packet, Netflow9Packet, OctetArray1, OctetArray2, Option10, Option9, ParserIPfixv10, ParserNetflowBase, ParserNetflowIpfix, ParserNetflowv5, ParserNetflowv9, ParserThread, PortConnection, Template10, Template9, UdpListenerThread

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



94
95
96
97
98
99
100
101
102
103
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 94

def configure(conf)
  super
  $log.debug "NetflowipfixInput::configure: #{@bind}:#{@port}"
  @@connections ||=  {}
  if @@connections.nil?
  end
  @@connections[@port] = PortConnection.new(@bind, @port, @tag, @cache_ttl, @definitions, @queuesleep, log)
  log.debug "NetflowipfixInput::configure NB=#{@@connections.length}"  
  @total = 0
end

#restartConnectionsObject



134
135
136
137
138
139
140
141
142
143
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 134

def restartConnections
    @@connections.each do | port, conn |
      $log.debug "restart parser #{conn.bind}:#{conn.port}"
      conn.restartParser       
    end
      before = GC.stat(:total_freed_objects)
      GC.start
      after = GC.stat(:total_freed_objects)

end

#shutdownObject



120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 120

def shutdown
  super
  $log.debug "NetflowipfixInput::shutdown NB=#{@@connections.length}"  
  if @@connections.nil?
  else
    @@connections.each do | port, conn |
      $log.debug "shutdown listening UDP on #{conn.bind}:#{conn.port}"
      conn.stop        
    end
    @@connections = nil
  end

end

#startObject



105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 105

def start
  super
  
  $log.debug "NetflowipfixInput::start NB=#{@@connections.length}" 
  if @@connections.nil?
  else
    @@connections.each do | port, conn |
      $log.debug "start listening UDP on #{conn.bind}:#{conn.port}"
      conn.start       
    end
  end      
  
  waitForEvents
end

#waitForEventsObject



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 145

def waitForEvents
  timeStart = Time.now.getutc.to_i
  nb = 0
  loop do
      @@connections.each do | port, conn |
        if (conn.event_queue_length > 0) 
          $log.trace "waitForEvents: #{conn.bind}:#{conn.port} queue has #{conn.event_queue_length} elements"
          nbq = conn.event_queue_length 
          loop do
            ar = conn.event_pop     
            time = ar[0]
            record = ar[1]
            router.emit(conn.tag, EventTime.new(time.to_i), record)
            # Free up variables for garbage collection
            ar = nil
            time = nil
            record = nil
            nb = nb + 1
            nbq = nbq - 1
            break if nbq == 0
          end 
        end
      end
#         @log.trace "NetflowipfixInput::waitForEvents ObjectSpace.memsize_of(NetflowipfixInput)=#{ObjectSpace.memsize_of(self)}"
      if Time.now.getutc.to_i - timeStart > 600 # 300 = 5 min
        restartConnections
        timeStart = Time.now.getutc.to_i
      end

      # Garbage collection
      if nb >= 20
        nb = 0
#           debugSpace
      end
      before = GC.stat(:total_freed_objects)
      GC.start
      after = GC.stat(:total_freed_objects)
#         $log.trace "waitForEvents: sleep #{@queuesleep}"
      sleep(@queuesleep)

  end

end