Class: DruidConfig::Cluster

Inherits:
Object
  • Object
show all
Includes:
Util, HTTParty
Defined in:
lib/druid_config/cluster.rb

Overview

Class to initialize the connection to Zookeeper

Instance Method Summary collapse

Methods included from Util

#secure_query

Constructor Details

#initialize(zk_uri, options) ⇒ Cluster

Initialize the client to perform the queries

Parameters:

zk_uri

String with URI or URIs (sparated by comma) of Zookeeper

options

Hash with options:

- discovery_path: String with the discovery path of Druid


20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/druid_config/cluster.rb', line 20

def initialize(zk_uri, options)
  # Initialize the Client
  DruidConfig.client = DruidConfig::Client.new(zk_uri, options)

  # Used to check the number of retries on error
  @retries = 0

  # Update the base uri to perform queries
  self.class.base_uri(
    "#{DruidConfig.client.coordinator}"\
    "druid/coordinator/#{DruidConfig::Version::API_VERSION}")
end

Instance Method Details

#close!Object

Close connection with zookeeper



36
37
38
# File 'lib/druid_config/cluster.rb', line 36

def close!
  DruidConfig.client.close!
end

#datasource(datasource) ⇒ Object

Return a unique datasource

Parameters:

datasource:

String with the data source name

Returns:

DataSource instance



157
158
159
# File 'lib/druid_config/cluster.rb', line 157

def datasource(datasource)
  datasources.select { |el| el.name == datasource }
end

#datasourcesObject

Return all datasources

Returns:

Array of Datasource initialized.



136
137
138
139
140
141
142
143
144
145
# File 'lib/druid_config/cluster.rb', line 136

def datasources
  datasource_status = load_status
  secure_query do
    self.class.get('/datasources?full').map do |data|
      DruidConfig::Entities::DataSource.new(
        data,
        datasource_status.select { |k, _| k == data['name'] }.values.first)
    end
  end
end

#historicalsObject

Returns only historial nodes

Returns:

Array of Nodes



235
236
237
# File 'lib/druid_config/cluster.rb', line 235

def historicals
  servers.select { |node| node.type == :historical }
end

#leaderObject

Return the leader of the Druid cluster



67
68
69
70
71
# File 'lib/druid_config/cluster.rb', line 67

def leader
  secure_query do
    self.class.get('/leader').body
  end
end

#load_queue(params = '') ⇒ Object

Load queue of the cluster



85
86
87
88
89
# File 'lib/druid_config/cluster.rb', line 85

def load_queue(params = '')
  secure_query do
    self.class.get("/loadqueue?#{params}")
  end
end

#load_status(params = '') ⇒ Object

Load status of the cluster



76
77
78
79
80
# File 'lib/druid_config/cluster.rb', line 76

def load_status(params = '')
  secure_query do
    self.class.get("/loadstatus?#{params}")
  end
end

#metadata_datasources(params = '') ⇒ Object Also known as: mt_datasources

Return a Hash with metadata of datasources



97
98
99
100
101
# File 'lib/druid_config/cluster.rb', line 97

def (params = '')
  secure_query do
    self.class.get("/metadata/datasources?#{params}")
  end
end

#metadata_datasources_segments(data_source, segment = '') ⇒ Object Also known as: mt_datasources_segments

Return a Hash with metadata of segments

Parameters:

data_source

String with the name of the data source

segment

(Optional) Segment to search



114
115
116
117
118
119
120
121
122
123
# File 'lib/druid_config/cluster.rb', line 114

def (data_source, segment = '')
  end_point = "/metadata/datasources/#{data_source}/segments"
  secure_query do
    if segment.empty? || segment == 'full'
      self.class.get("#{end_point}?#{params}")
    else
      self.class.get("#{end_point}/#{params}")
    end
  end
end

#physical_serversObject Also known as: physical_nodes

URIs of the physical servers in the cluster

Returns:

Array of strings



220
221
222
223
224
# File 'lib/druid_config/cluster.rb', line 220

def physical_servers
  secure_query do
    @physical_servers ||= servers.map(&:host).uniq
  end
end

#physical_workersObject

URIs of the physical workers in the cluster



280
281
282
# File 'lib/druid_config/cluster.rb', line 280

def physical_workers
  @physical_workers ||= workers.map(&:host).uniq
end

#realtimesObject

Returns only realtime

Returns:

Array of Nodes



245
246
247
# File 'lib/druid_config/cluster.rb', line 245

def realtimes
  servers.select { |node| node.type == :realtime }
end

#reset!Object

Reset the client



43
44
45
46
47
48
# File 'lib/druid_config/cluster.rb', line 43

def reset!
  DruidConfig.client.reset!
  self.class.base_uri(
    "#{DruidConfig.client.coordinator}"\
    "druid/coordinator/#{DruidConfig::Version::API_VERSION}")
end

#rulesObject

Return the rules applied to a cluster



167
168
169
170
171
# File 'lib/druid_config/cluster.rb', line 167

def rules
  secure_query do
    self.class.get('/rules')
  end
end

#serversObject Also known as: nodes

Return all nodes of the cluster

Returns:

Array of node Objects



203
204
205
206
207
208
209
210
211
212
# File 'lib/druid_config/cluster.rb', line 203

def servers
  secure_query do
    queue = load_queue('full')
    self.class.get('/servers?full').map do |data|
      DruidConfig::Entities::Node.new(
        data,
        queue.select { |k, _| k == data['host'] }.values.first)
    end
  end
end

#servicesObject

Availabe services in the cluster

Parameters:

Array of Hash with the format:

{ server: [ services ], server2: [ services ], ... }


294
295
296
297
298
299
300
301
302
303
304
# File 'lib/druid_config/cluster.rb', line 294

def services
  return @services if @services
  services = {}
  physical_nodes.each { |node| services[node] = [] }
  # Load services
  realtimes.map(&:host).uniq.each { |r| services[r] << :realtime }
  historicals.map(&:host).uniq.each { |r| services[r] << :historical }
  physical_workers.each { |w| services[w] << :middleManager }
  # Return nodes
  @services = services
end

#tiersObject

Return all tiers defined in the cluster

Returns:

Array of Tier instances



182
183
184
185
186
187
188
189
190
191
192
# File 'lib/druid_config/cluster.rb', line 182

def tiers
  current_nodes = servers
  # Initialize tiers
  secure_query do
    current_nodes.map(&:tier).uniq.map do |tier|
      DruidConfig::Entities::Tier.new(
        tier,
        current_nodes.select { |node| node.tier == tier })
    end
  end
end

#workersObject

Return all Workers (MiddleManager) of the cluster

Returns:

Array of Workers



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/druid_config/cluster.rb', line 255

def workers
  # Stash the base_uri
  stash_uri
  self.class.base_uri(
    "#{DruidConfig.client.overlord}"\
    "druid/indexer/#{DruidConfig::Version::API_VERSION}")
  workers = []
  # Perform a query
  begin
    secure_query do
      workers = self.class.get('/workers').map do |worker|
        DruidConfig::Entities::Worker.new(worker)
      end
    end
  ensure
    # Recover it
    pop_uri
  end
  # Return
  workers
end