Class: Fluent::Plugin::NetflowipfixInput::UdpListenerThread

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_netflowipfix.rb

Instance Method Summary collapse

Constructor Details

#initialize(bind, port, udpQueue, tag, log) ⇒ UdpListenerThread

Returns a new instance of UdpListenerThread.



194
195
196
197
198
199
200
201
202
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 194

def initialize(bind, port, udpQueue, tag, log)
	@port = port
	@udpQueue = udpQueue
	@udp_socket = UDPSocket.new
	@udp_socket.bind(bind, port)
	@total = 0
	@tag = tag
	@log = log
end

Instance Method Details

#closeObject



209
210
211
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 209

def close
		@udp_socket.close
end

#joinObject



213
214
215
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 213

def join
		@thread.join
end

#runObject



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 218

def run
	nb = 0 
		loop do
			msg, sender =  @udp_socket.recvfrom(4096)
			@total = @total + msg.length
			@log.trace "UdpListenerThread::recvfrom #{msg.length} bytes for #{@total} total on UDP/#{@port}"
			record = {}
			record["message"] = msg
			record["length"] = msg.length
			record["total"] = @total
			record["sender"] = sender
			record["port"] = @port
#				time = EventTime.new()
			time = Time.now.getutc
			@udpQueue << [time, record]
			# Garbage collection
			msg = nil
			sender = nil
			nb = nb + 1
			if nb > 100
				GC.start
				nb = 0
			end

		end
end

#startObject



204
205
206
207
# File 'lib/fluent/plugin/in_netflowipfix.rb', line 204

def start
	@thread = Thread.new(&method(:run))
	@log.trace "UdpListenerThread::start"
end