Class: Fluent::MetricSenseOutput::Backends::LibratoBackend

Inherits:
Fluent::MetricSenseOutput::Backend show all
Defined in:
lib/fluent/plugin/backends/librato_backend.rb

Constant Summary collapse

METRIC_INITIALIZE_REQUEST_PER_MODE =
[]

Constants inherited from Fluent::MetricSenseOutput::Backend

Fluent::MetricSenseOutput::Backend::UpdateMode

Instance Attribute Summary

Attributes inherited from Fluent::MetricSenseOutput::Backend

#log

Instance Method Summary collapse

Methods inherited from Fluent::MetricSenseOutput::Backend

#shutdown, #start

Constructor Details

#initializeLibratoBackend

Returns a new instance of LibratoBackend.



31
32
33
34
# File 'lib/fluent/plugin/backends/librato_backend.rb', line 31

def initialize
  super
  @initialized_metrics = {}
end

Instance Method Details

#ensure_metric_initialized(http, name, mode) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/fluent/plugin/backends/librato_backend.rb', line 122

def ensure_metric_initialized(http, name, mode)
  return if @initialized_metrics[name]

  header = {}
  req = Net::HTTP::Put.new("/v1/metrics/#{CGI.escape name}", header)
  req.basic_auth @librato_user, @librato_token

  log.trace { "librato initialize metric with mode #{mode}: #{name}" }
  req.body = METRIC_INITIALIZE_REQUEST_PER_MODE[mode]
  req.set_content_type("application/json")
  res = http.request(req)

  # TODO error handling
  if res.code !~ /20./
    log.warn "librato_metrics: #{res.code}: #{res.body}"
  else
    @initialized_metrics[name] = true
  end
end

#write(data) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/fluent/plugin/backends/librato_backend.rb', line 36

def write(data)
  http = Net::HTTP.new('metrics-api.librato.com', 443)
  http.open_timeout = 60
  http.use_ssl = true
  http.verify_mode = OpenSSL::SSL::VERIFY_NONE  # FIXME verify
  #http.verify_mode = OpenSSL::SSL::VERIFY_PEER
  http.cert_store = OpenSSL::X509::Store.new
  header = {}

  begin
    # send upto 50 entries at once
    data.each_slice(@batch_size) {|slice|
      req = Net::HTTP::Post.new('/v1/metrics', header)
      req.basic_auth @librato_user, @librato_token

      data = []
      slice.each_with_index {|(tag,time,value,seg_key,seg_val,mode),i|
        if seg_key
          name = "#{tag}:#{seg_key}"
          source = seg_val
        else
          name = tag
          source = nil
        end
        h = {
          "name" => name,
          "measure_time" => time,
          "value" => value,
        }
        h["source"] = source.to_s if source
        data << h
        ensure_metric_initialized(http, name, mode)
      }
      body = {"gauges"=>data}.to_json

      log.trace { "librato metrics: #{data.inspect}" }
      req.body = body
      req.set_content_type("application/json")
      res = http.request(req)

      # TODO error handling
      if res.code != "200"
        log.warn "librato_metrics: #{res.code}: #{res.body}"
      end
    }

  ensure
    http.finish if http.started?
  end
end