Class: Fluent::OpenTsdbOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_opentsdb.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



13
14
15
# File 'lib/fluent/plugin/out_opentsdb.rb', line 13

def configure(conf)
  super
end

#connectObject



22
23
24
25
# File 'lib/fluent/plugin/out_opentsdb.rb', line 22

def connect
  @socket = TCPSocket.new(@host, @port)    
  $log.info "connected to opentsdb at #{@host}:#{@port}"
end

#emit(tag, es, chain) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/fluent/plugin/out_opentsdb.rb', line 32

def emit(tag, es, chain)
  es.each do |time,record|
    $log.debug "opentsdb output processing record #{record}"
    record.each do |metric, value|
      value = 0 if value.nil? or value.to_s.empty?
      #$log.debug "metric[-4,4]=#{metric[-4,4]}, metric[-14..-3]=#{metric[-14..-3]}"
      if metric[-4, 4] == '_num'
        name = [@metric_prefix, @metric_num].join('.')
        put_metric(name, value, time, metric[0..-5])
      elsif metric[-14..-3] == '_percentile_'
        name = [@metric_prefix, @metric_durations, 'pct' + metric[-2,2]].join('.')        
        put_metric(name, value, time, metric[0..-15])
      else
        name = [@metric_prefix, @metric_durations, metric[-3, 3]].join('.')
        put_metric(name, value, time, metric[0..-5])
      end
    end      
  end

  chain.next
end

#put_metric(name, value, time, monitor_key_name) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/out_opentsdb.rb', line 54

def put_metric(name, value, time, monitor_key_name)
  tags = [@monitor_key_tag, monitor_key_name].join('=')
  unless @tags.nil? 
    i = 0;
    @tags.gsub(/ /, '').split(',').each do |val|
      tags << (i == 0 ? ' ' : '')
      tags << (i % 2 == 0 ? "#{val}=" : "#{val} ")
      i += 1
    end
  end
  message = ['put', name, time, value, tags].join(' ')
  #$log.debug message
  begin
    @socket.puts(message)
  rescue Errno::EPIPE, Errno::ECONNRESET => e
    $log.warn("Connection to opentsdb server died",
                 :exception => e, :host => @host, :port => @port)
    sleep(2)
    connect
  end
end

#shutdownObject



27
28
29
30
# File 'lib/fluent/plugin/out_opentsdb.rb', line 27

def shutdown
  super
  @socket.shutdown(Socket::SHUT_RDWR)
end

#startObject



17
18
19
20
# File 'lib/fluent/plugin/out_opentsdb.rb', line 17

def start
  super
  connect
end