Class: DruidDB::Node::Coordinator
- Inherits:
-
Object
- Object
- DruidDB::Node::Coordinator
- Defined in:
- lib/druiddb/node/coordinator.rb
Constant Summary collapse
- DATASOURCES_PATH =
'/druid/coordinator/v1/datasources/'.freeze
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#zk ⇒ Object
readonly
Returns the value of attribute zk.
Instance Method Summary collapse
-
#connection ⇒ Object
TODO: DRY; copy/paste from broker.
-
#datasource_enabled?(datasource_name) ⇒ Boolean
TODO: This should either be private or moved to datasource.
-
#datasource_has_segments?(datasource_name) ⇒ Boolean
TODO: This should either be private or moved to datasource.
- #datasource_info(datasource_name) ⇒ Object
- #disable_datasource(datasource_name) ⇒ Object
- #disable_segment(datasource_name, segment) ⇒ Object
-
#disable_segments(datasource_name) ⇒ Object
TODO: This should either be private or moved to datasource.
-
#initialize(config, zk) ⇒ Coordinator
constructor
A new instance of Coordinator.
- #issue_kill_task(datasource_name, interval) ⇒ Object
- #list_datasources(url_params = {}) ⇒ Object
- #list_segments(datasource_name) ⇒ Object
Constructor Details
#initialize(config, zk) ⇒ Coordinator
Returns a new instance of Coordinator.
7 8 9 10 |
# File 'lib/druiddb/node/coordinator.rb', line 7 def initialize(config, zk) @config = config @zk = zk end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
6 7 8 |
# File 'lib/druiddb/node/coordinator.rb', line 6 def config @config end |
#zk ⇒ Object (readonly)
Returns the value of attribute zk.
6 7 8 |
# File 'lib/druiddb/node/coordinator.rb', line 6 def zk @zk end |
Instance Method Details
#connection ⇒ Object
TODO: DRY; copy/paste from broker
13 14 15 16 17 18 19 |
# File 'lib/druiddb/node/coordinator.rb', line 13 def connection coordinator = zk.registry["#{config.discovery_path}/druid:coordinator"].first raise DruidDB::ConnectionError, 'no druid coordinators available' if coordinator.nil? # round-robin load balancing zk.registry["#{config.discovery_path}/druid:coordinator"].rotate! DruidDB::Connection.new(host: coordinator[:host], port: coordinator[:port]) end |
#datasource_enabled?(datasource_name) ⇒ Boolean
TODO: This should either be private or moved to datasource
41 42 43 |
# File 'lib/druiddb/node/coordinator.rb', line 41 def datasource_enabled?(datasource_name) list_datasources.include? datasource_name end |
#datasource_has_segments?(datasource_name) ⇒ Boolean
TODO: This should either be private or moved to datasource
46 47 48 |
# File 'lib/druiddb/node/coordinator.rb', line 46 def datasource_has_segments?(datasource_name) list_segments(datasource_name).any? end |
#datasource_info(datasource_name) ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/druiddb/node/coordinator.rb', line 21 def datasource_info(datasource_name) response = connection.get(DATASOURCES_PATH + datasource_name.to_s, full: true) unless response.code.to_i == 200 raise ConnectionError, 'Unable to retrieve datasource information.' end JSON.parse(response.body) end |
#disable_datasource(datasource_name) ⇒ Object
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/druiddb/node/coordinator.rb', line 29 def disable_datasource(datasource_name) # response = connection.delete(DATASOURCES_PATH + datasource_name.to_s) # raise ConnectionError, 'Unable to disable datasource' unless response.code.to_i == 200 # return true if response.code.to_i == 200 # This is a workaround for https://github.com/druid-io/druid/issues/3154 disable_segments(datasource_name) bounded_wait_for_segments_disable(datasource_name) true end |
#disable_segment(datasource_name, segment) ⇒ Object
50 51 52 53 54 |
# File 'lib/druiddb/node/coordinator.rb', line 50 def disable_segment(datasource_name, segment) response = connection.delete(DATASOURCES_PATH + datasource_name + '/segments/' + segment) raise ConnectionError, "Unable to disable #{segment}" unless response.code.to_i == 200 true end |
#disable_segments(datasource_name) ⇒ Object
TODO: This should either be private or moved to datasource
57 58 59 60 |
# File 'lib/druiddb/node/coordinator.rb', line 57 def disable_segments(datasource_name) segments = list_segments(datasource_name) segments.each { |segment| disable_segment(datasource_name, segment) } end |
#issue_kill_task(datasource_name, interval) ⇒ Object
62 63 64 65 66 |
# File 'lib/druiddb/node/coordinator.rb', line 62 def issue_kill_task(datasource_name, interval) response = connection.delete(DATASOURCES_PATH + datasource_name + '/intervals/' + interval) raise ConnectionError, 'Unable to issue kill task.' unless response.code.to_i == 200 true end |
#list_datasources(url_params = {}) ⇒ Object
68 69 70 71 |
# File 'lib/druiddb/node/coordinator.rb', line 68 def list_datasources(url_params = {}) response = connection.get(DATASOURCES_PATH, url_params) JSON.parse(response.body) if response.code.to_i == 200 end |
#list_segments(datasource_name) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/druiddb/node/coordinator.rb', line 73 def list_segments(datasource_name) response = connection.get(DATASOURCES_PATH + datasource_name + '/segments', full: true) case response.code.to_i when 200 JSON.parse(response.body).map { |segment| segment['identifier'] } when 204 [] else raise ConnectionError, "Unable to list segments for #{datasource_name}" end end |