Class: DruidDB::Node::Coordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/druiddb/node/coordinator.rb

Constant Summary collapse

DATASOURCES_PATH =
'/druid/coordinator/v1/datasources/'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, zk) ⇒ Coordinator

Returns a new instance of Coordinator.



7
8
9
10
# File 'lib/druiddb/node/coordinator.rb', line 7

def initialize(config, zk)
  @config = config
  @zk = zk
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



6
7
8
# File 'lib/druiddb/node/coordinator.rb', line 6

def config
  @config
end

#zkObject (readonly)

Returns the value of attribute zk.



6
7
8
# File 'lib/druiddb/node/coordinator.rb', line 6

def zk
  @zk
end

Instance Method Details

#connectionObject

TODO: DRY; copy/paste from broker



13
14
15
16
17
18
19
# File 'lib/druiddb/node/coordinator.rb', line 13

def connection
  coordinator = zk.registry["#{config.discovery_path}/druid:coordinator"].first
  raise DruidDB::ConnectionError, 'no druid coordinators available' if coordinator.nil?
  # round-robin load balancing
  zk.registry["#{config.discovery_path}/druid:coordinator"].rotate!
  DruidDB::Connection.new(host: coordinator[:host], port: coordinator[:port])
end

#datasource_enabled?(datasource_name) ⇒ Boolean

TODO: This should either be private or moved to datasource

Returns:

  • (Boolean)


41
42
43
# File 'lib/druiddb/node/coordinator.rb', line 41

def datasource_enabled?(datasource_name)
  list_datasources.include? datasource_name
end

#datasource_has_segments?(datasource_name) ⇒ Boolean

TODO: This should either be private or moved to datasource

Returns:

  • (Boolean)


46
47
48
# File 'lib/druiddb/node/coordinator.rb', line 46

def datasource_has_segments?(datasource_name)
  list_segments(datasource_name).any?
end

#datasource_info(datasource_name) ⇒ Object



21
22
23
24
25
26
27
# File 'lib/druiddb/node/coordinator.rb', line 21

def datasource_info(datasource_name)
  response = connection.get(DATASOURCES_PATH + datasource_name.to_s, full: true)
  unless response.code.to_i == 200
    raise ConnectionError, 'Unable to retrieve datasource information.'
  end
  JSON.parse(response.body)
end

#disable_datasource(datasource_name) ⇒ Object



29
30
31
32
33
34
35
36
37
38
# File 'lib/druiddb/node/coordinator.rb', line 29

def disable_datasource(datasource_name)
  # response = connection.delete(DATASOURCES_PATH + datasource_name.to_s)
  # raise ConnectionError, 'Unable to disable datasource' unless response.code.to_i == 200
  # return true if response.code.to_i == 200

  # This is a workaround for https://github.com/druid-io/druid/issues/3154
  disable_segments(datasource_name)
  bounded_wait_for_segments_disable(datasource_name)
  true
end

#disable_segment(datasource_name, segment) ⇒ Object

Raises:



50
51
52
53
54
# File 'lib/druiddb/node/coordinator.rb', line 50

def disable_segment(datasource_name, segment)
  response = connection.delete(DATASOURCES_PATH + datasource_name + '/segments/' + segment)
  raise ConnectionError, "Unable to disable #{segment}" unless response.code.to_i == 200
  true
end

#disable_segments(datasource_name) ⇒ Object

TODO: This should either be private or moved to datasource



57
58
59
60
# File 'lib/druiddb/node/coordinator.rb', line 57

def disable_segments(datasource_name)
  segments = list_segments(datasource_name)
  segments.each { |segment| disable_segment(datasource_name, segment) }
end

#issue_kill_task(datasource_name, interval) ⇒ Object

Raises:



62
63
64
65
66
# File 'lib/druiddb/node/coordinator.rb', line 62

def issue_kill_task(datasource_name, interval)
  response = connection.delete(DATASOURCES_PATH + datasource_name + '/intervals/' + interval)
  raise ConnectionError, 'Unable to issue kill task.' unless response.code.to_i == 200
  true
end

#list_datasources(url_params = {}) ⇒ Object



68
69
70
71
# File 'lib/druiddb/node/coordinator.rb', line 68

def list_datasources(url_params = {})
  response = connection.get(DATASOURCES_PATH, url_params)
  JSON.parse(response.body) if response.code.to_i == 200
end

#list_segments(datasource_name) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/druiddb/node/coordinator.rb', line 73

def list_segments(datasource_name)
  response = connection.get(DATASOURCES_PATH + datasource_name + '/segments', full: true)
  case response.code.to_i
  when 200
    JSON.parse(response.body).map { |segment| segment['identifier'] }
  when 204
    []
  else
    raise ConnectionError, "Unable to list segments for #{datasource_name}"
  end
end