Class: Fluent::UdpEventInput
- Inherits:
-
Input
- Object
- Input
- Fluent::UdpEventInput
- Defined in:
- lib/fluent/plugin/in_udp_event.rb
Overview
Fluentd UDP input main class
Defined Under Namespace
Classes: UdpHandler
Constant Summary collapse
- MAX_BLOCKTIME =
2
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ UdpEventInput
constructor
A new instance of UdpEventInput.
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ UdpEventInput
Returns a new instance of UdpEventInput.
15 16 17 18 |
# File 'lib/fluent/plugin/in_udp_event.rb', line 15 def initialize super require 'fluent/plugin/socket_util' end |
Instance Method Details
#configure(conf) ⇒ Object
24 25 26 |
# File 'lib/fluent/plugin/in_udp_event.rb', line 24 def configure(conf) super end |
#run ⇒ Object
67 68 69 70 71 72 |
# File 'lib/fluent/plugin/in_udp_event.rb', line 67 def run @loop.run rescue Exception => e $log.error 'unexpected error', error: e. $log.error_backtrace end |
#shutdown ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/in_udp_event.rb', line 43 def shutdown # Force event every MAX_BLOCKTIME seconds to prevent 60 second timeout # see ext/libev/ev.c MAX_BLOCKTIME @watcher = Cool.io::TimerWatcher.new(MAX_BLOCKTIME, true) @loop.attach(@watcher) $log.debug 'stopping event loop' begin @loop.stop rescue RuntimeError end $log.debug "closing udp socket on #{@bind}:#{@port}" @handler.close $log.debug 'closing watchers' @loop.watchers.each { |w| w.detach } $log.debug 'waiting for thread to finish' @thread.join $log.debug 'thread finished' $log.debug 'terminating' end |
#start ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/fluent/plugin/in_udp_event.rb', line 28 def start callback = method(:receive_data) @loop = Coolio::Loop.new backend: :epoll $log.debug "listening udp socket on #{@bind}:#{@port}" @usock = SocketUtil.create_udp_socket(@bind) @usock.bind(@bind, @port) @handler = UdpHandler.new(@usock, @max_message_size, callback) @loop.attach(@handler) @thread = Thread.new(&method(:run)) end |