Class: MetricsInflux::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/metrics_influx/engine.rb

Defined Under Namespace

Classes: Error

Instance Method Summary collapse

Constructor Details

#initialize(options, config) ⇒ Engine

Returns a new instance of Engine.



10
11
12
13
14
15
16
17
18
# File 'lib/metrics_influx/engine.rb', line 10

def initialize(options, config)
  @config = config
  @collectors = config['collectors']
  @timers = Timers::Group.new

  @collectors.each do |coll|
    coll[:instance] = MetricsInflux::Module[coll['type']].new(coll['config'])
  end
end

Instance Method Details

#connectionObject



20
21
22
23
24
25
26
27
28
# File 'lib/metrics_influx/engine.rb', line 20

def connection
  @http ||= begin
    raise ArgumentError, "Unknown InfluxDB protocol #{@config['server']['protocol']}" unless ['http', 'https'].include? @config['server']['protocol']
    http = Net::HTTP.new(@config['server']['host'], @config['server']['port'])
    http.use_ssl = @config['server']['protocol'] == 'https'
    http.verify_mode = OpenSSL::SSL::VERIFY_NONE if @config['server'].fetch('no_verify', false)
    http
  end
end

#do_post!(data) ⇒ Object



42
43
44
45
46
47
48
49
# File 'lib/metrics_influx/engine.rb', line 42

def do_post!(data)
  request = Net::HTTP::Post.new("/db/#{@config['database']}/series?time_precision=s")
  request.basic_auth @config['server']['user'], @config['server']['pass']
  request.add_field('Content-Type', 'application/json')
  request.body = data.to_json
  response = connection.request(request)
  raise Error.new response.body unless response.kind_of? Net::HTTPSuccess
end

#do_query(q) ⇒ Object



30
31
32
33
34
# File 'lib/metrics_influx/engine.rb', line 30

def do_query(q)
  request = Net::HTTP::Get.new("/db/#{@config['database']}/series?q=#{CGI.escape(q)}")
  request.basic_auth @config['server']['user'], @config['server']['pass']
  connection.request(request)
end

#run!Object



51
52
53
54
55
56
57
58
59
# File 'lib/metrics_influx/engine.rb', line 51

def run!
  grouped_collectors = @collectors.group_by { |coll| coll['interval'] }

  grouped_collectors.each do |interval,collectors|
    @timers.every(interval) { sample(collectors) }
  end

  loop { @timers.wait }
end

#sample(collectors) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/metrics_influx/engine.rb', line 61

def sample(collectors)
  futures = collectors.map { |coll| { coll: coll, future: coll[:instance].future(:sample) } }

  data = []

  futures.each do |c|
    coll = c[:coll]
    kvs = c[:future].value
    kvs = [kvs] if kvs.is_a? Hash
    kvs.each do |kv|
      data << {
        name:           coll['series'],
        columns:        kv.keys.map { |k| "#{coll['prefix'] || ""}#{k}" },
        points:         [ kv.values ]
      }
    end
  end

  begin
    do_post! data
  rescue MetricsInflux::Engine::Error => e
    MetricsInflux.logger.error "Error posting update: #{e.message}"
  rescue Net::OpenTimeout => e
    MetricsInflux.logger.error "Error posting update: #{e.message}"
  rescue Errno::ECONNREFUSED => e
    MetricsInflux.logger.error "Error posting update: #{e.message}"
  rescue SocketError => e
    MetricsInflux.logger.error "Error posting update: #{e.message}"
  end
end

#test_connection!Object



36
37
38
39
40
# File 'lib/metrics_influx/engine.rb', line 36

def test_connection!
  response = do_query('list series')
  raise Error.new response.body unless response.kind_of? Net::HTTPSuccess
  MetricsInflux.logger.debug "influxdb: Connection tested successfully"
end