Class: InfluxDB::Client
- Inherits:
-
Object
- Object
- InfluxDB::Client
- Includes:
- Logging
- Defined in:
- lib/influxdb/client.rb
Constant Summary
Constants included from Logging
Instance Attribute Summary collapse
-
#auth_method ⇒ Object
Returns the value of attribute auth_method.
-
#database ⇒ Object
Returns the value of attribute database.
-
#hosts ⇒ Object
Returns the value of attribute hosts.
-
#password ⇒ Object
Returns the value of attribute password.
-
#port ⇒ Object
Returns the value of attribute port.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#ssl_ca_cert ⇒ Object
Returns the value of attribute ssl_ca_cert.
-
#stopped ⇒ Object
Returns the value of attribute stopped.
-
#time_precision ⇒ Object
Returns the value of attribute time_precision.
-
#udp_client ⇒ Object
Returns the value of attribute udp_client.
-
#use_ssl ⇒ Object
Returns the value of attribute use_ssl.
-
#username ⇒ Object
Returns the value of attribute username.
-
#verify_ssl ⇒ Object
Returns the value of attribute verify_ssl.
-
#worker ⇒ Object
writeonly
Sets the attribute worker.
Instance Method Summary collapse
- #_write(payload, time_precision = @time_precision) ⇒ Object
- #alter_database_privilege(database, username, admin = true) ⇒ Object
- #authenticate_cluster_admin ⇒ Object
- #authenticate_database_user(database) ⇒ Object
- #configure_database(database_name, options = {}) ⇒ Object
-
#continuous_queries(database) ⇒ Object
NOTE: Only cluster admin can call this.
- #create_cluster_admin(username, password) ⇒ Object
-
#create_continuous_query(query, name) ⇒ Object
EXAMPLE:.
-
#create_database(name, options = {}) ⇒ Object
allow options, e.g.
- #create_database_user(database, username, password, options = {}) ⇒ Object
- #create_shard_space(database_name, options = {}) ⇒ Object
- #default_database_configuration ⇒ Object
- #default_shard_space_options ⇒ Object
- #delete_cluster_admin(username) ⇒ Object
-
#delete_continuous_query(id) ⇒ Object
NOTE: Only cluster admin can call this.
- #delete_database(name) ⇒ Object
- #delete_database_user(database, username) ⇒ Object
- #delete_series(series) ⇒ Object
- #delete_shard(shard_id, server_ids) ⇒ Object
- #delete_shard_space(database_name, shard_space_name) ⇒ Object
- #generate_payload(name, data) ⇒ Object
- #get_cluster_admin_list ⇒ Object
-
#get_continuous_query_list ⇒ Object
NOTE: Only cluster admin can call this.
- #get_database_list ⇒ Object
- #get_database_user_info(database, username) ⇒ Object
- #get_database_user_list(database) ⇒ Object
- #get_shard_list ⇒ Object
- #get_shard_space(database_name, shard_space_name) ⇒ Object
- #get_shard_space_list ⇒ Object
-
#initialize(*args) ⇒ Client
constructor
Initializes a new InfluxDB client.
- #ping ⇒ Object
- #query(query, time_precision = @time_precision) ⇒ Object
- #stop! ⇒ Object
- #stopped? ⇒ Boolean
- #update_cluster_admin(username, password) ⇒ Object
- #update_database_user(database, username, options = {}) ⇒ Object
-
#update_shard_space(database_name, shard_space_name, options) ⇒ Object
Get the shard space first, so the user doesn’t have to specify the existing options.
- #write_point(name, data, async = @async, time_precision = @time_precision) ⇒ Object
-
#write_points(name_data_hashes_array, async = @async, time_precision = @time_precision) ⇒ Object
Example: db.write_points( [ { name: ‘first_name’, data: { value: ‘val1’ } }, { name: ‘first_name’, data: { value: ‘val1’ } } ] ).
Methods included from Logging
Constructor Details
#initialize(*args) ⇒ Client
Initializes a new InfluxDB client
Examples:
InfluxDB::Client.new # connect to localhost using root/root
# as the credentials and doesn't connect to a db
InfluxDB::Client.new 'db' # connect to localhost using root/root
# as the credentials and 'db' as the db name
InfluxDB::Client.new :username => 'username' # override username, other defaults remain unchanged
Influxdb::Client.new 'db', :username => 'username' # override username, use 'db' as the db name
Influxdb::Client.new 'db', :path => '/prefix' # use the specified path prefix when building the
# url e.g.: /prefix/db/dbname...
Valid options in hash
:host
-
the hostname to connect to
:port
-
the port to connect to
:username
-
the username to use when executing commands
:password
-
the password associated with the username
:use_ssl
-
use ssl to connect?
:verify_ssl
-
verify ssl server certificate?
:ssl_ca_cert
-
ssl CA certificate, chainfile or CA path. The system CA path is automatically included.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/influxdb/client.rb', line 52 def initialize *args @database = args.first if args.first.is_a? String opts = args.last.is_a?(Hash) ? args.last : {} @hosts = Array(opts[:hosts] || opts[:host] || ["localhost"]) @port = opts[:port] || 8086 @path = opts[:path] || "" @username = opts[:username] || "root" @password = opts[:password] || "root" @auth_method = %w{params basic_auth}.include?(opts[:auth_method]) ? opts[:auth_method] : "params" @use_ssl = opts[:use_ssl] || false @verify_ssl = opts.fetch(:verify_ssl, true) @ssl_ca_cert = opts[:ssl_ca_cert] || false @time_precision = opts[:time_precision] || "s" @initial_delay = opts[:initial_delay] || 0.01 @max_delay = opts[:max_delay] || 30 @open_timeout = opts[:write_timeout] || 5 @read_timeout = opts[:read_timeout] || 300 @async = opts[:async] || false @retry = opts.fetch(:retry, nil) @retry = case @retry when Integer @retry when true, nil -1 when false 0 end @worker = InfluxDB::Worker.new(self) if @async self.udp_client = opts[:udp] ? InfluxDB::UDPClient.new(opts[:udp][:host], opts[:udp][:port]) : nil at_exit { stop! } if @retry > 0 end |
Instance Attribute Details
#auth_method ⇒ Object
Returns the value of attribute auth_method.
10 11 12 |
# File 'lib/influxdb/client.rb', line 10 def auth_method @auth_method end |
#database ⇒ Object
Returns the value of attribute database.
10 11 12 |
# File 'lib/influxdb/client.rb', line 10 def database @database end |
#hosts ⇒ Object
Returns the value of attribute hosts.
10 11 12 |
# File 'lib/influxdb/client.rb', line 10 def hosts @hosts end |
#password ⇒ Object
Returns the value of attribute password.
10 11 12 |
# File 'lib/influxdb/client.rb', line 10 def password @password end |
#port ⇒ Object
Returns the value of attribute port.
10 11 12 |
# File 'lib/influxdb/client.rb', line 10 def port @port end |
#queue ⇒ Object
Returns the value of attribute queue.
22 23 24 |
# File 'lib/influxdb/client.rb', line 22 def queue @queue end |
#ssl_ca_cert ⇒ Object
Returns the value of attribute ssl_ca_cert.
10 11 12 |
# File 'lib/influxdb/client.rb', line 10 def ssl_ca_cert @ssl_ca_cert end |
#stopped ⇒ Object
Returns the value of attribute stopped.
10 11 12 |
# File 'lib/influxdb/client.rb', line 10 def stopped @stopped end |
#time_precision ⇒ Object
Returns the value of attribute time_precision.
10 11 12 |
# File 'lib/influxdb/client.rb', line 10 def time_precision @time_precision end |
#udp_client ⇒ Object
Returns the value of attribute udp_client.
22 23 24 |
# File 'lib/influxdb/client.rb', line 22 def udp_client @udp_client end |
#use_ssl ⇒ Object
Returns the value of attribute use_ssl.
10 11 12 |
# File 'lib/influxdb/client.rb', line 10 def use_ssl @use_ssl end |
#username ⇒ Object
Returns the value of attribute username.
10 11 12 |
# File 'lib/influxdb/client.rb', line 10 def username @username end |
#verify_ssl ⇒ Object
Returns the value of attribute verify_ssl.
10 11 12 |
# File 'lib/influxdb/client.rb', line 10 def verify_ssl @verify_ssl end |
#worker=(value) ⇒ Object
Sets the attribute worker
22 23 24 |
# File 'lib/influxdb/client.rb', line 22 def worker=(value) @worker = value end |
Instance Method Details
#_write(payload, time_precision = @time_precision) ⇒ Object
303 304 305 306 307 |
# File 'lib/influxdb/client.rb', line 303 def _write(payload, time_precision=@time_precision) url = full_url("/db/#{@database}/series", :time_precision => time_precision) data = JSON.generate(payload) post(url, data) end |
#alter_database_privilege(database, username, admin = true) ⇒ Object
157 158 159 |
# File 'lib/influxdb/client.rb', line 157 def alter_database_privilege(database, username, admin=true) update_database_user(database, username, :admin => admin) end |
#authenticate_cluster_admin ⇒ Object
105 106 107 |
# File 'lib/influxdb/client.rb', line 105 def authenticate_cluster_admin get(full_url('/cluster_admins/authenticate'), true) end |
#authenticate_database_user(database) ⇒ Object
129 130 131 |
# File 'lib/influxdb/client.rb', line 129 def authenticate_database_user(database) get(full_url("/db/#{database}/authenticate"), true) end |
#configure_database(database_name, options = {}) ⇒ Object
241 242 243 244 245 246 |
# File 'lib/influxdb/client.rb', line 241 def configure_database(database_name, = {}) url = full_url("/cluster/database_configs/#{database_name}") data = JSON.generate(default_database_configuration.merge()) post(url, data) end |
#continuous_queries(database) ⇒ Object
NOTE: Only cluster admin can call this
162 163 164 |
# File 'lib/influxdb/client.rb', line 162 def continuous_queries(database) get full_url("/db/#{database}/continuous_queries") end |
#create_cluster_admin(username, password) ⇒ Object
109 110 111 112 113 |
# File 'lib/influxdb/client.rb', line 109 def create_cluster_admin(username, password) url = full_url("/cluster_admins") data = JSON.generate({:name => username, :password => password}) post(url, data) end |
#create_continuous_query(query, name) ⇒ Object
EXAMPLE:
db.create_continuous_query(
"select mean(sys) as sys, mean(usr) as usr from cpu group by time(15m)",
"cpu.15m",
)
NOTE: Only cluster admin can call this
183 184 185 |
# File 'lib/influxdb/client.rb', line 183 def create_continuous_query(query, name) query("#{query} into #{name}") end |
#create_database(name, options = {}) ⇒ Object
allow options, e.g. influxdb.create_database(‘foo’, replicationFactor: 3)
91 92 93 94 95 |
# File 'lib/influxdb/client.rb', line 91 def create_database(name, = {}) url = full_url("/cluster/database_configs/#{name}") data = JSON.generate() post(url, data) end |
#create_database_user(database, username, password, options = {}) ⇒ Object
133 134 135 136 137 |
# File 'lib/influxdb/client.rb', line 133 def create_database_user(database, username, password, ={}) url = full_url("/db/#{database}/users") data = JSON.generate({:name => username, :password => password}.merge()) post(url, data) end |
#create_shard_space(database_name, options = {}) ⇒ Object
208 209 210 211 212 213 |
# File 'lib/influxdb/client.rb', line 208 def create_shard_space(database_name, = {}) url = full_url("/cluster/shard_spaces/#{database_name}") data = JSON.generate(.merge()) post(url, data) end |
#default_database_configuration ⇒ Object
248 249 250 |
# File 'lib/influxdb/client.rb', line 248 def default_database_configuration {:spaces => []} end |
#default_shard_space_options ⇒ Object
230 231 232 233 234 235 236 237 238 239 |
# File 'lib/influxdb/client.rb', line 230 def { "name" => "default", "regEx" => "/.*/", "retentionPolicy" => "inf", "shardDuration" => "7d", "replicationFactor" => 1, "split" => 1 } end |
#delete_cluster_admin(username) ⇒ Object
121 122 123 |
# File 'lib/influxdb/client.rb', line 121 def delete_cluster_admin(username) delete full_url("/cluster_admins/#{username}") end |
#delete_continuous_query(id) ⇒ Object
NOTE: Only cluster admin can call this
193 194 195 |
# File 'lib/influxdb/client.rb', line 193 def delete_continuous_query(id) query("drop continuous query #{id}") end |
#delete_database(name) ⇒ Object
97 98 99 |
# File 'lib/influxdb/client.rb', line 97 def delete_database(name) delete full_url("/db/#{name}") end |
#delete_database_user(database, username) ⇒ Object
145 146 147 |
# File 'lib/influxdb/client.rb', line 145 def delete_database_user(database, username) delete full_url("/db/#{database}/users/#{username}") end |
#delete_series(series) ⇒ Object
325 326 327 |
# File 'lib/influxdb/client.rb', line 325 def delete_series(series) delete full_url("/db/#{@database}/series/#{series}") end |
#delete_shard(shard_id, server_ids) ⇒ Object
170 171 172 173 |
# File 'lib/influxdb/client.rb', line 170 def delete_shard(shard_id, server_ids) data = JSON.generate({"serverIds" => server_ids}) delete full_url("/cluster/shards/#{shard_id}"), data end |
#delete_shard_space(database_name, shard_space_name) ⇒ Object
215 216 217 |
# File 'lib/influxdb/client.rb', line 215 def delete_shard_space(database_name, shard_space_name) delete full_url("/cluster/shard_spaces/#{database_name}/#{shard_space_name}") end |
#generate_payload(name, data) ⇒ Object
289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/influxdb/client.rb', line 289 def generate_payload(name, data) data = data.is_a?(Array) ? data : [data] columns = data.reduce(:merge).keys.sort {|a,b| a.to_s <=> b.to_s} payload = {:name => name, :points => [], :columns => columns} data.each do |point| payload[:points] << columns.inject([]) do |array, column| array << InfluxDB::PointValue.new(point[column]).dump end end payload end |
#get_cluster_admin_list ⇒ Object
125 126 127 |
# File 'lib/influxdb/client.rb', line 125 def get_cluster_admin_list get full_url("/cluster_admins") end |
#get_continuous_query_list ⇒ Object
NOTE: Only cluster admin can call this
188 189 190 |
# File 'lib/influxdb/client.rb', line 188 def get_continuous_query_list query("list continuous queries") end |
#get_database_list ⇒ Object
101 102 103 |
# File 'lib/influxdb/client.rb', line 101 def get_database_list get full_url("/db") end |
#get_database_user_info(database, username) ⇒ Object
153 154 155 |
# File 'lib/influxdb/client.rb', line 153 def get_database_user_info(database, username) get full_url("/db/#{database}/users/#{username}") end |
#get_database_user_list(database) ⇒ Object
149 150 151 |
# File 'lib/influxdb/client.rb', line 149 def get_database_user_list(database) get full_url("/db/#{database}/users") end |
#get_shard_list ⇒ Object
166 167 168 |
# File 'lib/influxdb/client.rb', line 166 def get_shard_list() get full_url("/cluster/shards") end |
#get_shard_space(database_name, shard_space_name) ⇒ Object
201 202 203 204 205 206 |
# File 'lib/influxdb/client.rb', line 201 def get_shard_space(database_name, shard_space_name) get_shard_space_list.find do |shard_space| shard_space["database"] == database_name && shard_space["name"] == shard_space_name end end |
#get_shard_space_list ⇒ Object
197 198 199 |
# File 'lib/influxdb/client.rb', line 197 def get_shard_space_list get full_url("/cluster/shard_spaces") end |
#ping ⇒ Object
86 87 88 |
# File 'lib/influxdb/client.rb', line 86 def ping get "/ping" end |
#query(query, time_precision = @time_precision) ⇒ Object
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/influxdb/client.rb', line 309 def query(query, time_precision=@time_precision) url = full_url("/db/#{@database}/series", :q => query, :time_precision => time_precision) series = get(url) if block_given? series.each { |s| yield s['name'], denormalize_series(s) } else series.reduce({}) do |col, s| name = s['name'] denormalized_series = denormalize_series s col[name] = denormalized_series col end end end |
#stop! ⇒ Object
329 330 331 |
# File 'lib/influxdb/client.rb', line 329 def stop! @stopped = true end |
#stopped? ⇒ Boolean
333 334 335 |
# File 'lib/influxdb/client.rb', line 333 def stopped? @stopped end |
#update_cluster_admin(username, password) ⇒ Object
115 116 117 118 119 |
# File 'lib/influxdb/client.rb', line 115 def update_cluster_admin(username, password) url = full_url("/cluster_admins/#{username}") data = JSON.generate({:password => password}) post(url, data) end |
#update_database_user(database, username, options = {}) ⇒ Object
139 140 141 142 143 |
# File 'lib/influxdb/client.rb', line 139 def update_database_user(database, username, = {}) url = full_url("/db/#{database}/users/#{username}") data = JSON.generate() post(url, data) end |
#update_shard_space(database_name, shard_space_name, options) ⇒ Object
Get the shard space first, so the user doesn’t have to specify the existing options
220 221 222 223 224 225 226 227 228 |
# File 'lib/influxdb/client.rb', line 220 def update_shard_space(database_name, shard_space_name, ) = get_shard_space(database_name, shard_space_name) .delete("database") url = full_url("/cluster/shard_spaces/#{database_name}/#{shard_space_name}") data = JSON.generate(.merge()) post(url, data) end |
#write_point(name, data, async = @async, time_precision = @time_precision) ⇒ Object
252 253 254 |
# File 'lib/influxdb/client.rb', line 252 def write_point(name, data, async=@async, time_precision=@time_precision) write_points([{:name => name, :data => data}], async, time_precision) end |
#write_points(name_data_hashes_array, async = @async, time_precision = @time_precision) ⇒ Object
Example: db.write_points(
[
{
name: 'first_name',
data: {
value: 'val1'
}
},
{
name: 'first_name',
data: {
value: 'val1'
}
}
]
)
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/influxdb/client.rb', line 273 def write_points(name_data_hashes_array, async=@async, time_precision=@time_precision) payloads = [] name_data_hashes_array.each do |attrs| payloads << generate_payload(attrs[:name], attrs[:data]) end if async worker.push(payloads) elsif udp_client udp_client.send(payloads) else _write(payloads, time_precision) end end |