Class: DruidConfig::ZK

Inherits:
Object
  • Object
show all
Defined in:
lib/druid_config/zk.rb

Overview

Class to connect and get information about nodes in cluster using Zookeeper

Constant Summary collapse

COORDINATOR =

Coordinator service

'coordinator'
OVERLORD =
'overlord'
SERVICES =
[COORDINATOR, OVERLORD]

Instance Method Summary collapse

Constructor Details

#initialize(uri, opts = {}) ⇒ ZK

Initialize variables and call register

Parameters:

uri

Uri of zookeper

opts

Hash with options:

- discovery_path: Custom URL of discovery path for Druid


31
32
33
34
35
36
37
38
39
# File 'lib/druid_config/zk.rb', line 31

def initialize(uri, opts = {})
  # Control Zookeper connection
  @zk = ::ZK.new(uri, chroot: :check)
  @registry = Hash.new { |hash, key| hash[key] = [] }
  @discovery_path = opts[:discovery_path] || '/discovery'
  @watched_services = {}
  @verify_retry = 0
  register
end

Instance Method Details

#check_service(service) ⇒ Object

Check a given Druid service. Now we only need to track coordinator and overlord services. This method create a watcher to the service to check changes.

This method get the available nodes in the Zookeeper path. When return them, it tries to connect to /status end point to check if the node is available. After it, it store in @registry.

Parameters:

service

String with the service to check



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/druid_config/zk.rb', line 209

def check_service(service)
  # Only watch some services
  return if @watched_services.include?(service) ||
            !SERVICES.include?(service)
  # Start to watch this service
  watch_service(service)
  # New list of nodes
  new_list = []

  # Verify every node
  live = @zk.children(watch_path(service), watch: true)
  live.each do |name|
    # Verify a node
    uri = verify_node(name, service)
    # If != false store the URI
    new_list.push(name: name, uri: uri) if uri
  end
  # Register new service in the registry
  register_service(service, new_list)
end

#check_servicesObject

Check current services



135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/druid_config/zk.rb', line 135

def check_services
  $log.info("druid.zk checking services") if $log
  zk_services = @zk.children(@discovery_path, watch: true)

  (services - zk_services).each do |service|
    unregister_service(service)
  end

  zk_services.each do |service|
    check_service(service)
  end
end

#close!Object

Force to close Zookeper connection



57
58
59
60
# File 'lib/druid_config/zk.rb', line 57

def close!
  $log.info('druid.zk shutting down') if $log
  @zk.close!
end

#coordinatorObject

Return the URI of a random available coordinator. Poor mans load balancing



66
67
68
# File 'lib/druid_config/zk.rb', line 66

def coordinator
  random_node(COORDINATOR)
end

#overlordObject

Return the URI of a random available overlord. Poor mans load balancing



74
75
76
# File 'lib/druid_config/zk.rb', line 74

def overlord
  random_node(OVERLORD)
end

#random_node(service) ⇒ Object

Return a random value of a service

Parameters:

service

String with the name of the service



85
86
87
88
89
# File 'lib/druid_config/zk.rb', line 85

def random_node(service)
  return nil if @registry[service].size == 0
  # Return a random broker from available brokers
  @registry[service].sample[:uri]
end

#registerObject

Load the data from Zookeeper



44
45
46
47
48
49
50
51
52
# File 'lib/druid_config/zk.rb', line 44

def register
  $log.info('druid.zk register discovery path') if $log
  @zk.on_expired_session { register }
  @zk.register(@discovery_path, only: :child) do
    $log.info('druid.zk got event on discovery path') if $log
    check_services
  end
  check_services
end

#register_service(service, brokers) ⇒ Object

Register a new service



94
95
96
97
98
# File 'lib/druid_config/zk.rb', line 94

def register_service(service, brokers)
  $log.info("druid.zk register", service: service, brokers: brokers) if $log
  # poor mans load balancing
  @registry[service] = brokers
end

#servicesObject

Get all available services



233
234
235
# File 'lib/druid_config/zk.rb', line 233

def services
  @registry.keys
end

#to_sObject



237
238
239
# File 'lib/druid_config/zk.rb', line 237

def to_s
  @registry.to_s
end

#unregister_service(service) ⇒ Object

Unregister a service



103
104
105
106
107
# File 'lib/druid_config/zk.rb', line 103

def unregister_service(service)
  $log.info("druid.zk unregister", service: service) if $log
  @registry.delete(service)
  unwatch_service(service)
end

#unwatch_service(service) ⇒ Object

Unset a service to watch



126
127
128
129
130
# File 'lib/druid_config/zk.rb', line 126

def unwatch_service(service)
  return unless @watched_services.include?(service)
  $log.info("druid.zk unwatch", service: service) if $log
  @watched_services.delete(service).unregister
end

#verify_node(name, service) ⇒ Object

Verify is a Coordinator is available. To do check, this method perform a GET request to the /status end point. This method will retry to connect three times with a delay of 0.8, 1.6, 2.4 seconds.

Parameters:

name

String with the name of the coordinator

service

String with the service

Returns:

URI of the coordinator or false



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/druid_config/zk.rb', line 162

def verify_node(name, service)
  $log.info("druid.zk verify", node: name, service: service) if $log
  info = @zk.get("#{watch_path(service)}/#{name}")
  node = JSON.parse(info[0])
  uri = "http://#{node['address']}:#{node['port']}/"
  # Try to get /status
  check = RestClient::Request.execute(
    method: :get, url: "#{uri}status",
    timeout: 5, open_timeout: 5
  )
  $log.info("druid.zk verified", uri: uri, sources: check) if $log
  return uri if check.code == 200
rescue
  return false unless @verify_retry < 3
  # Sleep some time and retry
  @verify_retry += 1
  sleep 0.8 * @verify_retry
  retry
ensure
  # Reset verify retries
  @verify_retry = 0
end

#watch_path(service) ⇒ Object

Return the path of a service in Zookeeper.

Parameters:

service

String with the name of the service



192
193
194
# File 'lib/druid_config/zk.rb', line 192

def watch_path(service)
  "#{@discovery_path}/#{service}"
end

#watch_service(service) ⇒ Object

Set a watcher for a service



112
113
114
115
116
117
118
119
120
121
# File 'lib/druid_config/zk.rb', line 112

def watch_service(service)
  return if @watched_services.include?(service)
  $log.info("druid.zk watch", service: service) if $log
  watch = @zk.register(watch_path(service), only: :child) do |event|
    $log.info("druid.zk got event on watch path for", service: service, event: event) if $log
    unwatch_service(service)
    check_service(service)
  end
  @watched_services[service] = watch
end