Class: DruidDB::Node::Overlord
- Inherits:
-
Object
- Object
- DruidDB::Node::Overlord
- Defined in:
- lib/druiddb/node/overlord.rb
Constant Summary collapse
- INDEXER_PATH =
'/druid/indexer/v1/'.freeze
- RUNNING_TASKS_PATH =
(INDEXER_PATH + 'runningTasks').freeze
- TASK_PATH =
(INDEXER_PATH + 'task/').freeze
- SUPERVISOR_PATH =
(INDEXER_PATH + 'supervisor/').freeze
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#zk ⇒ Object
readonly
Returns the value of attribute zk.
Instance Method Summary collapse
-
#connection ⇒ Object
TODO: DRY: copy/paste.
-
#initialize(config, zk) ⇒ Overlord
constructor
A new instance of Overlord.
- #running_tasks(datasource_name = nil) ⇒ Object
- #shutdown_task(task) ⇒ Object
- #shutdown_tasks(datasource_name = nil) ⇒ Object
- #submit_supervisor_spec(filepath) ⇒ Object
- #supervisor_tasks ⇒ Object
Constructor Details
#initialize(config, zk) ⇒ Overlord
Returns a new instance of Overlord.
10 11 12 13 |
# File 'lib/druiddb/node/overlord.rb', line 10 def initialize(config, zk) @config = config @zk = zk end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
9 10 11 |
# File 'lib/druiddb/node/overlord.rb', line 9 def config @config end |
#zk ⇒ Object (readonly)
Returns the value of attribute zk.
9 10 11 |
# File 'lib/druiddb/node/overlord.rb', line 9 def zk @zk end |
Instance Method Details
#connection ⇒ Object
TODO: DRY: copy/paste
16 17 18 19 20 21 |
# File 'lib/druiddb/node/overlord.rb', line 16 def connection overlord = zk.registry["#{config.discovery_path}/druid:overlord"].first raise DruidDB::ConnectionError, 'no druid overlords available' if overlord.nil? zk.registry["#{config.discovery_path}/druid:overlord"].rotate! # round-robin load balancing DruidDB::Connection.new(host: overlord[:host], port: overlord[:port]) end |
#running_tasks(datasource_name = nil) ⇒ Object
23 24 25 26 27 28 29 |
# File 'lib/druiddb/node/overlord.rb', line 23 def running_tasks(datasource_name = nil) response = connection.get(RUNNING_TASKS_PATH) raise ConnectionError, 'Could not retrieve running tasks' unless response.code.to_i == 200 tasks = JSON.parse(response.body).map { |task| task['id'] } tasks.select! { |task| task.include? datasource_name } if datasource_name tasks ? tasks : [] end |
#shutdown_task(task) ⇒ Object
31 32 33 34 35 |
# File 'lib/druiddb/node/overlord.rb', line 31 def shutdown_task(task) response = connection.post(TASK_PATH + task + '/shutdown') raise ConnectionError, 'Unable to shutdown task' unless response.code.to_i == 200 bounded_wait_for_shutdown(task) end |
#shutdown_tasks(datasource_name = nil) ⇒ Object
37 38 39 40 |
# File 'lib/druiddb/node/overlord.rb', line 37 def shutdown_tasks(datasource_name = nil) tasks = running_tasks(datasource_name) tasks.each { |task| shutdown_task(task) } end |
#submit_supervisor_spec(filepath) ⇒ Object
48 49 50 51 52 53 |
# File 'lib/druiddb/node/overlord.rb', line 48 def submit_supervisor_spec(filepath) spec = JSON.parse(File.read(filepath)) response = connection.post(SUPERVISOR_PATH, spec) raise ConnectionError, 'Unable to submit spec' unless response.code.to_i == 200 JSON.parse(response.body) end |
#supervisor_tasks ⇒ Object
42 43 44 45 46 |
# File 'lib/druiddb/node/overlord.rb', line 42 def supervisor_tasks response = connection.get(SUPERVISOR_PATH) raise ConnectionError, 'Could not retrieve supervisors' unless response.code.to_i == 200 JSON.parse(response.body) end |