Class: DruidConfig::Cluster
- Inherits:
-
Object
- Object
- DruidConfig::Cluster
- Includes:
- Util, HTTParty
- Defined in:
- lib/druid_config/cluster.rb
Overview
Class to initialize the connection to Zookeeper
Instance Method Summary collapse
-
#close! ⇒ Object
Close connection with zookeeper.
-
#datasource(datasource) ⇒ Object
Return a unique datasource.
-
#datasources ⇒ Object
Return all datasources.
-
#historicals ⇒ Object
Returns only historial nodes.
-
#initialize(zk_uri, options) ⇒ Cluster
constructor
Initialize the client to perform the queries.
-
#leader ⇒ Object
Return the leader of the Druid cluster.
-
#load_queue(params = '') ⇒ Object
Load queue of the cluster.
-
#load_status(params = '') ⇒ Object
Load status of the cluster.
-
#metadata_datasources(params = '') ⇒ Object
(also: #mt_datasources)
Return a Hash with metadata of datasources.
-
#metadata_datasources_segments(data_source, segment = '') ⇒ Object
(also: #mt_datasources_segments)
Return a Hash with metadata of segments.
-
#physical_servers ⇒ Object
(also: #physical_nodes)
URIs of the physical servers in the cluster.
-
#physical_workers ⇒ Object
URIs of the physical workers in the cluster.
-
#realtimes ⇒ Object
Returns only realtime.
-
#reset! ⇒ Object
Reset the client.
-
#rules ⇒ Object
Return the rules applied to a cluster.
-
#servers ⇒ Object
(also: #nodes)
Return all nodes of the cluster.
-
#services ⇒ Object
Availabe services in the cluster.
-
#tiers ⇒ Object
Return all tiers defined in the cluster.
-
#workers ⇒ Object
Return all Workers (MiddleManager) of the cluster.
Methods included from Util
Constructor Details
#initialize(zk_uri, options) ⇒ Cluster
Initialize the client to perform the queries
Parameters:
- zk_uri
-
String with URI or URIs (sparated by comma) of Zookeeper
- options
-
Hash with options:
- discovery_path: String with the discovery path of Druid
20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/druid_config/cluster.rb', line 20 def initialize(zk_uri, ) # Initialize the Client DruidConfig.client = DruidConfig::Client.new(zk_uri, ) # Used to check the number of retries on error @retries = 0 # Update the base uri to perform queries self.class.base_uri( "#{DruidConfig.client.coordinator}"\ "druid/coordinator/#{DruidConfig::Version::API_VERSION}") end |
Instance Method Details
#close! ⇒ Object
Close connection with zookeeper
36 37 38 |
# File 'lib/druid_config/cluster.rb', line 36 def close! DruidConfig.client.close! end |
#datasource(datasource) ⇒ Object
Return a unique datasource
Parameters:
datasource:
String with the data source name
Returns:
DataSource instance
157 158 159 |
# File 'lib/druid_config/cluster.rb', line 157 def datasource(datasource) datasources.select { |el| el.name == datasource } end |
#datasources ⇒ Object
Return all datasources
Returns:
Array of Datasource initialized.
136 137 138 139 140 141 142 143 144 145 |
# File 'lib/druid_config/cluster.rb', line 136 def datasources datasource_status = load_status secure_query do self.class.get('/datasources?full').map do |data| DruidConfig::Entities::DataSource.new( data, datasource_status.select { |k, _| k == data['name'] }.values.first) end end end |
#historicals ⇒ Object
Returns only historial nodes
Returns:
Array of Nodes
235 236 237 |
# File 'lib/druid_config/cluster.rb', line 235 def historicals servers.select { |node| node.type == :historical } end |
#leader ⇒ Object
Return the leader of the Druid cluster
67 68 69 70 71 |
# File 'lib/druid_config/cluster.rb', line 67 def leader secure_query do self.class.get('/leader').body end end |
#load_queue(params = '') ⇒ Object
Load queue of the cluster
85 86 87 88 89 |
# File 'lib/druid_config/cluster.rb', line 85 def load_queue(params = '') secure_query do self.class.get("/loadqueue?#{params}") end end |
#load_status(params = '') ⇒ Object
Load status of the cluster
76 77 78 79 80 |
# File 'lib/druid_config/cluster.rb', line 76 def load_status(params = '') secure_query do self.class.get("/loadstatus?#{params}") end end |
#metadata_datasources(params = '') ⇒ Object Also known as: mt_datasources
Return a Hash with metadata of datasources
97 98 99 100 101 |
# File 'lib/druid_config/cluster.rb', line 97 def (params = '') secure_query do self.class.get("/metadata/datasources?#{params}") end end |
#metadata_datasources_segments(data_source, segment = '') ⇒ Object Also known as: mt_datasources_segments
Return a Hash with metadata of segments
Parameters:
- data_source
-
String with the name of the data source
- segment
-
(Optional) Segment to search
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/druid_config/cluster.rb', line 114 def (data_source, segment = '') end_point = "/metadata/datasources/#{data_source}/segments" secure_query do if segment.empty? || segment == 'full' self.class.get("#{end_point}?#{params}") else self.class.get("#{end_point}/#{params}") end end end |
#physical_servers ⇒ Object Also known as: physical_nodes
URIs of the physical servers in the cluster
Returns:
Array of strings
220 221 222 223 224 |
# File 'lib/druid_config/cluster.rb', line 220 def physical_servers secure_query do @physical_servers ||= servers.map(&:host).uniq end end |
#physical_workers ⇒ Object
URIs of the physical workers in the cluster
280 281 282 |
# File 'lib/druid_config/cluster.rb', line 280 def physical_workers @physical_workers ||= workers.map(&:host).uniq end |
#realtimes ⇒ Object
Returns only realtime
Returns:
Array of Nodes
245 246 247 |
# File 'lib/druid_config/cluster.rb', line 245 def realtimes servers.select { |node| node.type == :realtime } end |
#reset! ⇒ Object
Reset the client
43 44 45 46 47 48 |
# File 'lib/druid_config/cluster.rb', line 43 def reset! DruidConfig.client.reset! self.class.base_uri( "#{DruidConfig.client.coordinator}"\ "druid/coordinator/#{DruidConfig::Version::API_VERSION}") end |
#rules ⇒ Object
Return the rules applied to a cluster
167 168 169 170 171 |
# File 'lib/druid_config/cluster.rb', line 167 def rules secure_query do self.class.get('/rules') end end |
#servers ⇒ Object Also known as: nodes
Return all nodes of the cluster
Returns:
Array of node Objects
203 204 205 206 207 208 209 210 211 212 |
# File 'lib/druid_config/cluster.rb', line 203 def servers secure_query do queue = load_queue('full') self.class.get('/servers?full').map do |data| DruidConfig::Entities::Node.new( data, queue.select { |k, _| k == data['host'] }.values.first) end end end |
#services ⇒ Object
Availabe services in the cluster
Parameters:
Array of Hash with the format:
{ server: [ services ], server2: [ services ], ... }
294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/druid_config/cluster.rb', line 294 def services return @services if @services services = {} physical_nodes.each { |node| services[node] = [] } # Load services realtimes.map(&:host).uniq.each { |r| services[r] << :realtime } historicals.map(&:host).uniq.each { |r| services[r] << :historical } physical_workers.each { |w| services[w] << :middleManager } # Return nodes @services = services end |
#tiers ⇒ Object
Return all tiers defined in the cluster
Returns:
Array of Tier instances
182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/druid_config/cluster.rb', line 182 def tiers current_nodes = servers # Initialize tiers secure_query do current_nodes.map(&:tier).uniq.map do |tier| DruidConfig::Entities::Tier.new( tier, current_nodes.select { |node| node.tier == tier }) end end end |
#workers ⇒ Object
Return all Workers (MiddleManager) of the cluster
Returns:
Array of Workers
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/druid_config/cluster.rb', line 255 def workers # Stash the base_uri stash_uri self.class.base_uri( "#{DruidConfig.client.overlord}"\ "druid/indexer/#{DruidConfig::Version::API_VERSION}") workers = [] # Perform a query begin secure_query do workers = self.class.get('/workers').map do |worker| DruidConfig::Entities::Worker.new(worker) end end ensure # Recover it pop_uri end # Return workers end |