Class: InfluxDB::Client

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/influxdb/client.rb

Constant Summary

Constants included from Logging

Logging::PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

logger, logger=

Constructor Details

#initialize(*args) ⇒ Client

Initializes a new InfluxDB client

Examples:

InfluxDB::Client.new                               # connect to localhost using root/root
                                                   # as the credentials and doesn't connect to a db

InfluxDB::Client.new 'db'                          # connect to localhost using root/root
                                                   # as the credentials and 'db' as the db name

InfluxDB::Client.new :username => 'username'       # override username, other defaults remain unchanged

Influxdb::Client.new 'db', :username => 'username' # override username, use 'db' as the db name

Influxdb::Client.new 'db', :path => '/prefix'      # use the specified path prefix when building the
                                                   # url e.g.: /prefix/db/dbname...

Valid options in hash

:host

the hostname to connect to

:port

the port to connect to

:username

the username to use when executing commands

:password

the password associated with the username

:use_ssl

use ssl to connect?

:verify_ssl

verify ssl server certificate?

:ssl_ca_cert

ssl CA certificate, chainfile or CA path. The system CA path is automatically included.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/influxdb/client.rb', line 52

def initialize *args
  @database = args.first if args.first.is_a? String
  opts = args.last.is_a?(Hash) ? args.last : {}
  @hosts = Array(opts[:hosts] || opts[:host] || ["localhost"])
  @port = opts[:port] || 8086
  @path = opts[:path] || ""
  @username = opts[:username] || "root"
  @password = opts[:password] || "root"
  @auth_method = %w{params basic_auth}.include?(opts[:auth_method]) ? opts[:auth_method] : "params"
  @use_ssl = opts[:use_ssl] || false
  @verify_ssl = opts.fetch(:verify_ssl, true)
  @ssl_ca_cert = opts[:ssl_ca_cert] || false
  @time_precision = opts[:time_precision] || "s"
  @initial_delay = opts[:initial_delay] || 0.01
  @max_delay = opts[:max_delay] || 30
  @open_timeout = opts[:write_timeout] || 5
  @read_timeout = opts[:read_timeout] || 300
  @async = opts[:async] || false
  @retry = opts.fetch(:retry, nil)
  @retry = case @retry
  when Integer
    @retry
  when true, nil
    -1
  when false
    0
  end

  @worker = InfluxDB::Worker.new(self) if @async
  self.udp_client = opts[:udp] ? InfluxDB::UDPClient.new(opts[:udp][:host], opts[:udp][:port]) : nil

  at_exit { stop! } if @retry > 0
end

Instance Attribute Details

#auth_methodObject

Returns the value of attribute auth_method.



10
11
12
# File 'lib/influxdb/client.rb', line 10

def auth_method
  @auth_method
end

#databaseObject

Returns the value of attribute database.



10
11
12
# File 'lib/influxdb/client.rb', line 10

def database
  @database
end

#hostsObject

Returns the value of attribute hosts.



10
11
12
# File 'lib/influxdb/client.rb', line 10

def hosts
  @hosts
end

#passwordObject

Returns the value of attribute password.



10
11
12
# File 'lib/influxdb/client.rb', line 10

def password
  @password
end

#portObject

Returns the value of attribute port.



10
11
12
# File 'lib/influxdb/client.rb', line 10

def port
  @port
end

#queueObject

Returns the value of attribute queue.



22
23
24
# File 'lib/influxdb/client.rb', line 22

def queue
  @queue
end

#ssl_ca_certObject

Returns the value of attribute ssl_ca_cert.



10
11
12
# File 'lib/influxdb/client.rb', line 10

def ssl_ca_cert
  @ssl_ca_cert
end

#stoppedObject

Returns the value of attribute stopped.



10
11
12
# File 'lib/influxdb/client.rb', line 10

def stopped
  @stopped
end

#time_precisionObject

Returns the value of attribute time_precision.



10
11
12
# File 'lib/influxdb/client.rb', line 10

def time_precision
  @time_precision
end

#udp_clientObject

Returns the value of attribute udp_client.



22
23
24
# File 'lib/influxdb/client.rb', line 22

def udp_client
  @udp_client
end

#use_sslObject

Returns the value of attribute use_ssl.



10
11
12
# File 'lib/influxdb/client.rb', line 10

def use_ssl
  @use_ssl
end

#usernameObject

Returns the value of attribute username.



10
11
12
# File 'lib/influxdb/client.rb', line 10

def username
  @username
end

#verify_sslObject

Returns the value of attribute verify_ssl.



10
11
12
# File 'lib/influxdb/client.rb', line 10

def verify_ssl
  @verify_ssl
end

#worker=(value) ⇒ Object

Sets the attribute worker

Parameters:

  • value

    the value to set the attribute worker to.



22
23
24
# File 'lib/influxdb/client.rb', line 22

def worker=(value)
  @worker = value
end

Instance Method Details

#_write(payload, time_precision = @time_precision) ⇒ Object



303
304
305
306
307
# File 'lib/influxdb/client.rb', line 303

def _write(payload, time_precision=@time_precision)
  url = full_url("/db/#{@database}/series", :time_precision => time_precision)
  data = JSON.generate(payload)
  post(url, data)
end

#alter_database_privilege(database, username, admin = true) ⇒ Object



157
158
159
# File 'lib/influxdb/client.rb', line 157

def alter_database_privilege(database, username, admin=true)
  update_database_user(database, username, :admin => admin)
end

#authenticate_cluster_adminObject



105
106
107
# File 'lib/influxdb/client.rb', line 105

def authenticate_cluster_admin
  get(full_url('/cluster_admins/authenticate'), true)
end

#authenticate_database_user(database) ⇒ Object



129
130
131
# File 'lib/influxdb/client.rb', line 129

def authenticate_database_user(database)
  get(full_url("/db/#{database}/authenticate"), true)
end

#configure_database(database_name, options = {}) ⇒ Object



241
242
243
244
245
246
# File 'lib/influxdb/client.rb', line 241

def configure_database(database_name, options = {})
  url  = full_url("/cluster/database_configs/#{database_name}")
  data = JSON.generate(default_database_configuration.merge(options))

  post(url, data)
end

#continuous_queries(database) ⇒ Object

NOTE: Only cluster admin can call this



162
163
164
# File 'lib/influxdb/client.rb', line 162

def continuous_queries(database)
  get full_url("/db/#{database}/continuous_queries")
end

#create_cluster_admin(username, password) ⇒ Object



109
110
111
112
113
# File 'lib/influxdb/client.rb', line 109

def create_cluster_admin(username, password)
  url = full_url("/cluster_admins")
  data = JSON.generate({:name => username, :password => password})
  post(url, data)
end

#create_continuous_query(query, name) ⇒ Object

EXAMPLE:

db.create_continuous_query(

"select mean(sys) as sys, mean(usr) as usr from cpu group by time(15m)",
"cpu.15m",

)

NOTE: Only cluster admin can call this



183
184
185
# File 'lib/influxdb/client.rb', line 183

def create_continuous_query(query, name)
  query("#{query} into #{name}")
end

#create_database(name, options = {}) ⇒ Object

allow options, e.g. influxdb.create_database(‘foo’, replicationFactor: 3)



91
92
93
94
95
# File 'lib/influxdb/client.rb', line 91

def create_database(name, options = {})
  url = full_url("/cluster/database_configs/#{name}")
  data = JSON.generate(options)
  post(url, data)
end

#create_database_user(database, username, password, options = {}) ⇒ Object



133
134
135
136
137
# File 'lib/influxdb/client.rb', line 133

def create_database_user(database, username, password, options={})
  url = full_url("/db/#{database}/users")
  data = JSON.generate({:name => username, :password => password}.merge(options))
  post(url, data)
end

#create_shard_space(database_name, options = {}) ⇒ Object



208
209
210
211
212
213
# File 'lib/influxdb/client.rb', line 208

def create_shard_space(database_name, options = {})
  url  = full_url("/cluster/shard_spaces/#{database_name}")
  data = JSON.generate(default_shard_space_options.merge(options))

  post(url, data)
end

#default_database_configurationObject



248
249
250
# File 'lib/influxdb/client.rb', line 248

def default_database_configuration
  {:spaces => [default_shard_space_options]}
end

#default_shard_space_optionsObject



230
231
232
233
234
235
236
237
238
239
# File 'lib/influxdb/client.rb', line 230

def default_shard_space_options
  {
    "name"              => "default",
    "regEx"             => "/.*/",
    "retentionPolicy"   => "inf",
    "shardDuration"     => "7d",
    "replicationFactor" => 1,
    "split"             => 1
  }
end

#delete_cluster_admin(username) ⇒ Object



121
122
123
# File 'lib/influxdb/client.rb', line 121

def delete_cluster_admin(username)
  delete full_url("/cluster_admins/#{username}")
end

#delete_continuous_query(id) ⇒ Object

NOTE: Only cluster admin can call this



193
194
195
# File 'lib/influxdb/client.rb', line 193

def delete_continuous_query(id)
  query("drop continuous query #{id}")
end

#delete_database(name) ⇒ Object



97
98
99
# File 'lib/influxdb/client.rb', line 97

def delete_database(name)
  delete full_url("/db/#{name}")
end

#delete_database_user(database, username) ⇒ Object



145
146
147
# File 'lib/influxdb/client.rb', line 145

def delete_database_user(database, username)
  delete full_url("/db/#{database}/users/#{username}")
end

#delete_series(series) ⇒ Object



325
326
327
# File 'lib/influxdb/client.rb', line 325

def delete_series(series)
  delete full_url("/db/#{@database}/series/#{series}")
end

#delete_shard(shard_id, server_ids) ⇒ Object



170
171
172
173
# File 'lib/influxdb/client.rb', line 170

def delete_shard(shard_id, server_ids)
  data = JSON.generate({"serverIds" => server_ids})
  delete full_url("/cluster/shards/#{shard_id}"), data
end

#delete_shard_space(database_name, shard_space_name) ⇒ Object



215
216
217
# File 'lib/influxdb/client.rb', line 215

def delete_shard_space(database_name, shard_space_name)
  delete full_url("/cluster/shard_spaces/#{database_name}/#{shard_space_name}")
end

#generate_payload(name, data) ⇒ Object



289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/influxdb/client.rb', line 289

def generate_payload(name, data)
  data = data.is_a?(Array) ? data : [data]
  columns = data.reduce(:merge).keys.sort {|a,b| a.to_s <=> b.to_s}
  payload = {:name => name, :points => [], :columns => columns}

  data.each do |point|
    payload[:points] << columns.inject([]) do |array, column|
      array << InfluxDB::PointValue.new(point[column]).dump
    end
  end

  payload
end

#get_cluster_admin_listObject



125
126
127
# File 'lib/influxdb/client.rb', line 125

def get_cluster_admin_list
  get full_url("/cluster_admins")
end

#get_continuous_query_listObject

NOTE: Only cluster admin can call this



188
189
190
# File 'lib/influxdb/client.rb', line 188

def get_continuous_query_list
  query("list continuous queries")
end

#get_database_listObject



101
102
103
# File 'lib/influxdb/client.rb', line 101

def get_database_list
  get full_url("/db")
end

#get_database_user_info(database, username) ⇒ Object



153
154
155
# File 'lib/influxdb/client.rb', line 153

def (database, username)
  get full_url("/db/#{database}/users/#{username}")
end

#get_database_user_list(database) ⇒ Object



149
150
151
# File 'lib/influxdb/client.rb', line 149

def get_database_user_list(database)
  get full_url("/db/#{database}/users")
end

#get_shard_listObject



166
167
168
# File 'lib/influxdb/client.rb', line 166

def get_shard_list()
  get full_url("/cluster/shards")
end

#get_shard_space(database_name, shard_space_name) ⇒ Object



201
202
203
204
205
206
# File 'lib/influxdb/client.rb', line 201

def get_shard_space(database_name, shard_space_name)
  get_shard_space_list.find do |shard_space|
    shard_space["database"] == database_name &&
      shard_space["name"] == shard_space_name
  end
end

#get_shard_space_listObject



197
198
199
# File 'lib/influxdb/client.rb', line 197

def get_shard_space_list
  get full_url("/cluster/shard_spaces")
end

#pingObject



86
87
88
# File 'lib/influxdb/client.rb', line 86

def ping
  get "/ping"
end

#query(query, time_precision = @time_precision) ⇒ Object



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/influxdb/client.rb', line 309

def query(query, time_precision=@time_precision)
  url = full_url("/db/#{@database}/series", :q => query, :time_precision => time_precision)
  series = get(url)

  if block_given?
    series.each { |s| yield s['name'], denormalize_series(s) }
  else
    series.reduce({}) do |col, s|
      name                  = s['name']
      denormalized_series   = denormalize_series s
      col[name]             = denormalized_series
      col
    end
  end
end

#stop!Object



329
330
331
# File 'lib/influxdb/client.rb', line 329

def stop!
  @stopped = true
end

#stopped?Boolean

Returns:

  • (Boolean)


333
334
335
# File 'lib/influxdb/client.rb', line 333

def stopped?
  @stopped
end

#update_cluster_admin(username, password) ⇒ Object



115
116
117
118
119
# File 'lib/influxdb/client.rb', line 115

def update_cluster_admin(username, password)
  url = full_url("/cluster_admins/#{username}")
  data = JSON.generate({:password => password})
  post(url, data)
end

#update_database_user(database, username, options = {}) ⇒ Object



139
140
141
142
143
# File 'lib/influxdb/client.rb', line 139

def update_database_user(database, username, options = {})
  url = full_url("/db/#{database}/users/#{username}")
  data = JSON.generate(options)
  post(url, data)
end

#update_shard_space(database_name, shard_space_name, options) ⇒ Object

Get the shard space first, so the user doesn’t have to specify the existing options



220
221
222
223
224
225
226
227
228
# File 'lib/influxdb/client.rb', line 220

def update_shard_space(database_name, shard_space_name, options)
  shard_space_options = get_shard_space(database_name, shard_space_name)
  shard_space_options.delete("database")

  url  = full_url("/cluster/shard_spaces/#{database_name}/#{shard_space_name}")
  data = JSON.generate(shard_space_options.merge(options))

  post(url, data)
end

#write_point(name, data, async = @async, time_precision = @time_precision) ⇒ Object



252
253
254
# File 'lib/influxdb/client.rb', line 252

def write_point(name, data, async=@async, time_precision=@time_precision)
  write_points([{:name => name, :data => data}], async, time_precision)
end

#write_points(name_data_hashes_array, async = @async, time_precision = @time_precision) ⇒ Object

Example: db.write_points(

[
    {
        name: 'first_name',
        data: {
            value: 'val1'
        }
    },
    {
        name: 'first_name',
        data: {
            value: 'val1'
        }
    }
]

)



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/influxdb/client.rb', line 273

def write_points(name_data_hashes_array, async=@async, time_precision=@time_precision)

  payloads = []
  name_data_hashes_array.each do |attrs|
    payloads << generate_payload(attrs[:name], attrs[:data])
  end

  if async
    worker.push(payloads)
  elsif udp_client
    udp_client.send(payloads)
  else
    _write(payloads, time_precision)
  end
end