Class: Fluent::Plugin::NetflowipfixInput::UdpListenerThread
- Inherits:
-
Object
- Object
- Fluent::Plugin::NetflowipfixInput::UdpListenerThread
- Defined in:
- lib/fluent/plugin/in_netflowipfix.rb
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(bind, port, udpQueue, tag, log) ⇒ UdpListenerThread
constructor
A new instance of UdpListenerThread.
- #join ⇒ Object
- #run ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(bind, port, udpQueue, tag, log) ⇒ UdpListenerThread
Returns a new instance of UdpListenerThread.
194 195 196 197 198 199 200 201 202 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 194 def initialize(bind, port, udpQueue, tag, log) @port = port @udpQueue = udpQueue @udp_socket = UDPSocket.new @udp_socket.bind(bind, port) @total = 0 @tag = tag @log = log end |
Instance Method Details
#close ⇒ Object
209 210 211 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 209 def close @udp_socket.close end |
#join ⇒ Object
213 214 215 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 213 def join @thread.join end |
#run ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 218 def run nb = 0 loop do msg, sender = @udp_socket.recvfrom(4096) @total = @total + msg.length @log.trace "UdpListenerThread::recvfrom #{msg.length} bytes for #{@total} total on UDP/#{@port}" record = {} record["message"] = msg record["length"] = msg.length record["total"] = @total record["sender"] = sender record["port"] = @port # time = EventTime.new() time = Time.now.getutc @udpQueue << [time, record] # Garbage collection msg = nil sender = nil nb = nb + 1 if nb > 100 GC.start nb = 0 end end end |
#start ⇒ Object
204 205 206 207 |
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 204 def start @thread = Thread.new(&method(:run)) @log.trace "UdpListenerThread::start" end |