Class: Fluent::JOpenTsdbOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::JOpenTsdbOutput
- Defined in:
- lib/fluent/plugin/out_jopentsdb.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #connect ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #put_metric(record) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
11 12 13 |
# File 'lib/fluent/plugin/out_jopentsdb.rb', line 11 def configure(conf) super end |
#connect ⇒ Object
20 21 22 23 |
# File 'lib/fluent/plugin/out_jopentsdb.rb', line 20 def connect @socket = TCPSocket.new(@host, @port) $log.info "connected to opentsdb at #{@host}:#{@port}" end |
#emit(tag, es, chain) ⇒ Object
30 31 32 33 34 35 36 37 |
# File 'lib/fluent/plugin/out_jopentsdb.rb', line 30 def emit(tag, es, chain) es.each do |time,record| $log.debug "opentsdb output processing record #{record}" put_metric(record) end chain.next end |
#put_metric(record) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/fluent/plugin/out_jopentsdb.rb', line 39 def put_metric(record) $log.debug "#{record}" = record $log.debug begin uri = URI.parse("http://#{@host}:#{@port}/api/put") request = Net::HTTP::Post.new(uri) request.content_type = "application/json" request["Accept"] = "Application/json" request.body = JSON.dump() = { use_ssl: uri.scheme == "https", } response = Net::HTTP.start(uri.hostname, uri.port, ) do |http| http.request(request) end rescue Errno::EPIPE, Errno::ECONNRESET => e $log.warn("Connection to opentsdb server died", :exception => e, :host => @host, :port => @port) sleep(2) connect end end |
#shutdown ⇒ Object
25 26 27 28 |
# File 'lib/fluent/plugin/out_jopentsdb.rb', line 25 def shutdown super @socket.shutdown(Socket::SHUT_RDWR) end |
#start ⇒ Object
15 16 17 18 |
# File 'lib/fluent/plugin/out_jopentsdb.rb', line 15 def start super connect end |