Class: Fluent::JOpenTsdbOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_jopentsdb.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



11
12
13
# File 'lib/fluent/plugin/out_jopentsdb.rb', line 11

def configure(conf)
  super
end

#connectObject



20
21
22
23
# File 'lib/fluent/plugin/out_jopentsdb.rb', line 20

def connect
  @socket = TCPSocket.new(@host, @port)    
  $log.info "connected to opentsdb at #{@host}:#{@port}"
end

#emit(tag, es, chain) ⇒ Object



30
31
32
33
34
35
36
37
# File 'lib/fluent/plugin/out_jopentsdb.rb', line 30

def emit(tag, es, chain)
  es.each do |time,record|
    $log.debug "opentsdb output processing record #{record}"
    put_metric(record)
  end

  chain.next
end

#put_metric(record) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_jopentsdb.rb', line 39

def put_metric(record)
  $log.debug "#{record}"
  message = record
  $log.debug message
  begin
    uri = URI.parse("http://#{@host}:#{@port}/api/put")
    request = Net::HTTP::Post.new(uri)
    request.content_type = "application/json"
    request["Accept"] = "Application/json"
    request.body = JSON.dump(message)
  req_options = {
  use_ssl: uri.scheme == "https",
    }
                    
  response = Net::HTTP.start(uri.hostname, uri.port, req_options) do |http|
  http.request(request)
  end
                                  
  rescue Errno::EPIPE, Errno::ECONNRESET => e
    $log.warn("Connection to opentsdb server died",
                 :exception => e, :host => @host, :port => @port)
    sleep(2)
    connect
  end
end

#shutdownObject



25
26
27
28
# File 'lib/fluent/plugin/out_jopentsdb.rb', line 25

def shutdown
  super
  @socket.shutdown(Socket::SHUT_RDWR)
end

#startObject



15
16
17
18
# File 'lib/fluent/plugin/out_jopentsdb.rb', line 15

def start
  super
  connect
end