Class: Fluent::TaggedUdpOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_tagged_udp.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



19
20
21
22
23
24
25
26
27
# File 'lib/fluent/plugin/out_tagged_udp.rb', line 19

def configure(conf)
  super

  # You can also refer raw parameter via conf[name].
  @host ||= conf['host']
  @port ||= conf['port']
  @tag_sep ||= conf['tag_sep']
  @socket = UDPSocket.new
end

#emit(tag, es, chain) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fluent/plugin/out_tagged_udp.rb', line 51

def emit(tag, es, chain)
  begin
    es.each {|time,record|
      $log.debug "#{tag}, #{format_time(time)}, #{record}"
      @socket.send(
        # tag is inserted into the head of the message
        "#{tag}#{@tag_sep}#{record.merge(timestamp_hash(time)).to_json}",
        Socket::MSG_EOR, @host, @port
      )
    }
    $log.flush
    chain.next
  rescue StandardError => e
    $log.debug "#{e.class}: #{e.message}"
  end
end

#format_time(time) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/out_tagged_udp.rb', line 29

def format_time(time)
  case @time_format
  when nil then
    # default format is integer value
    time
  when "iso8601" then
    # iso8601 format
    Time.at(time).iso8601
  else
    # specified strftime format
    Time.at(time).strftime(@time_format)
  end
end

#timestamp_hash(time) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/fluent/plugin/out_tagged_udp.rb', line 43

def timestamp_hash(time)
  if @time_key.nil?
    {}
  else
    {@time_key => format_time(time)}
  end
end