Class: Fluent::taggedUdpInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_tagged_udp.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



23
24
25
26
27
28
29
30
31
# File 'lib/fluent/plugin/in_tagged_udp.rb', line 23

def configure(conf)
  super

  # You can also refer raw parameter via conf[name].
  @bind ||= conf['bind']
  @port ||= conf['port']
  @tag_sep ||= conf['tag_sep']
  configure_parser(conf)
end

#configure_parser(conf) ⇒ Object



33
34
35
36
# File 'lib/fluent/plugin/in_tagged_udp.rb', line 33

def configure_parser(conf)
  @parser = Plugin.new_parser(conf['format'])
  @parser.configure(conf)
end

#parse(message) ⇒ Object



38
39
40
41
42
43
44
45
46
# File 'lib/fluent/plugin/in_tagged_udp.rb', line 38

def parse(message)
  @parser.parse(message) do |time, record|
    if time.nil?
      $log.debug "Since time_key field is nil, Fluent::Engine.now is used."
      time = Fluent::Engine.now
    end
    return [time, record]
  end
end

#shutdownObject



66
67
68
# File 'lib/fluent/plugin/in_tagged_udp.rb', line 66

def shutdown
  @thread.kill
end

#startObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/in_tagged_udp.rb', line 48

def start
  $log.debug "start udp server #{@bind}"

  @thread = Thread.new(Thread.current) do |parent|
    begin
      Socket.udp_server_loop(@bind, @port) do |msg, msg_src|
        $log.debug("Received #{msg}")
        tag, message = msg.split(@tag_sep)
        time, record = parse(message)
        $log.debug "#{tag}, #{time}, #{record}"
        router.emit(tag, time, record)
      end
    rescue StandardError => e
      parent.raise(e)
    end
  end
end