Class: DruidDB::Node::Overlord

Inherits:
Object
  • Object
show all
Defined in:
lib/druiddb/node/overlord.rb

Constant Summary collapse

INDEXER_PATH =
'/druid/indexer/v1/'.freeze
RUNNING_TASKS_PATH =
(INDEXER_PATH + 'runningTasks').freeze
TASK_PATH =
(INDEXER_PATH + 'task/').freeze
SUPERVISOR_PATH =
(INDEXER_PATH + 'supervisor/').freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, zk) ⇒ Overlord

Returns a new instance of Overlord.



10
11
12
13
# File 'lib/druiddb/node/overlord.rb', line 10

def initialize(config, zk)
  @config = config
  @zk = zk
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



9
10
11
# File 'lib/druiddb/node/overlord.rb', line 9

def config
  @config
end

#zkObject (readonly)

Returns the value of attribute zk.



9
10
11
# File 'lib/druiddb/node/overlord.rb', line 9

def zk
  @zk
end

Instance Method Details

#connectionObject

TODO: DRY: copy/paste



16
17
18
19
20
21
# File 'lib/druiddb/node/overlord.rb', line 16

def connection
  overlord = zk.registry["#{config.discovery_path}/druid:overlord"].first
  raise DruidDB::ConnectionError, 'no druid overlords available' if overlord.nil?
  zk.registry["#{config.discovery_path}/druid:overlord"].rotate! # round-robin load balancing
  DruidDB::Connection.new(host: overlord[:host], port: overlord[:port])
end

#running_tasks(datasource_name = nil) ⇒ Object

Raises:



23
24
25
26
27
28
29
# File 'lib/druiddb/node/overlord.rb', line 23

def running_tasks(datasource_name = nil)
  response = connection.get(RUNNING_TASKS_PATH)
  raise ConnectionError, 'Could not retrieve running tasks' unless response.code.to_i == 200
  tasks = JSON.parse(response.body).map { |task| task['id'] }
  tasks.select! { |task| task.include? datasource_name } if datasource_name
  tasks ? tasks : []
end

#shutdown_task(task) ⇒ Object

Raises:



31
32
33
34
35
# File 'lib/druiddb/node/overlord.rb', line 31

def shutdown_task(task)
  response = connection.post(TASK_PATH + task + '/shutdown')
  raise ConnectionError, 'Unable to shutdown task' unless response.code.to_i == 200
  bounded_wait_for_shutdown(task)
end

#shutdown_tasks(datasource_name = nil) ⇒ Object



37
38
39
40
# File 'lib/druiddb/node/overlord.rb', line 37

def shutdown_tasks(datasource_name = nil)
  tasks = running_tasks(datasource_name)
  tasks.each { |task| shutdown_task(task) }
end

#submit_supervisor_spec(filepath) ⇒ Object

Raises:



48
49
50
51
52
53
# File 'lib/druiddb/node/overlord.rb', line 48

def submit_supervisor_spec(filepath)
  spec = JSON.parse(File.read(filepath))
  response = connection.post(SUPERVISOR_PATH, spec)
  raise ConnectionError, 'Unable to submit spec' unless response.code.to_i == 200
  JSON.parse(response.body)
end

#supervisor_tasksObject

Raises:



42
43
44
45
46
# File 'lib/druiddb/node/overlord.rb', line 42

def supervisor_tasks
  response = connection.get(SUPERVISOR_PATH)
  raise ConnectionError, 'Could not retrieve supervisors' unless response.code.to_i == 200
  JSON.parse(response.body)
end