Class: Fluent::UdpEventInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_udp_event.rb

Overview

Fluentd UDP input main class

Defined Under Namespace

Classes: UdpHandler

Constant Summary collapse

MAX_BLOCKTIME =
2

Instance Method Summary collapse

Constructor Details

#initializeUdpEventInput

Returns a new instance of UdpEventInput.



15
16
17
18
# File 'lib/fluent/plugin/in_udp_event.rb', line 15

def initialize
  super
  require 'fluent/plugin/socket_util'
end

Instance Method Details

#configure(conf) ⇒ Object



24
25
26
# File 'lib/fluent/plugin/in_udp_event.rb', line 24

def configure(conf)
  super
end

#runObject



67
68
69
70
71
72
# File 'lib/fluent/plugin/in_udp_event.rb', line 67

def run
  @loop.run
rescue Exception => e
  $log.error 'unexpected error', error: e.message
  $log.error_backtrace
end

#shutdownObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/in_udp_event.rb', line 43

def shutdown
  # Force event every MAX_BLOCKTIME seconds to prevent 60 second timeout
  # see ext/libev/ev.c MAX_BLOCKTIME
  @watcher = Cool.io::TimerWatcher.new(MAX_BLOCKTIME, true)
  @loop.attach(@watcher)

  $log.debug 'stopping event loop'
  begin
    @loop.stop
  rescue RuntimeError
  end

  $log.debug "closing udp socket on #{@bind}:#{@port}"
  @handler.close

  $log.debug 'closing watchers'
  @loop.watchers.each { |w| w.detach }

  $log.debug 'waiting for thread to finish'
  @thread.join
  $log.debug 'thread finished'
  $log.debug 'terminating'
end

#startObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/in_udp_event.rb', line 28

def start
  callback = method(:receive_data)

  @loop = Coolio::Loop.new backend: :epoll

  $log.debug "listening udp socket on #{@bind}:#{@port}"
  @usock = SocketUtil.create_udp_socket(@bind)
  @usock.bind(@bind, @port)

  @handler = UdpHandler.new(@usock, @max_message_size, callback)
  @loop.attach(@handler)

  @thread = Thread.new(&method(:run))
end