Class: Druid::ZK

Inherits:
Object
  • Object
show all
Defined in:
lib/druid/zk.rb

Instance Method Summary collapse

Constructor Details

#initialize(uri, opts = {}) ⇒ ZK

Returns a new instance of ZK.



7
8
9
10
11
12
13
# File 'lib/druid/zk.rb', line 7

def initialize(uri, opts = {})
  @zk = ::ZK.new(uri, chroot: :check)
  @registry = Hash.new { |hash, key| hash[key] = Array.new }
  @discovery_path = opts[:discovery_path] || '/discovery'
  @watched_services = Hash.new
  register
end

Instance Method Details

#check_service(service) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/druid/zk.rb', line 91

def check_service(service)
  return if @watched_services.include?(service)

  watch_service(service)

  known = @registry[service].map { |node| node[:name] }
  live = @zk.children(watch_path(service), watch: true)
  new_list = @registry[service].select { |node| live.include?(node[:name]) }
  $log.debug("druid.zk checking", service: service, known: known, live: live, new_list: new_list) if $log

  # verify the new entries to be living brokers
  (live - known).each do |name|
    uri, sources = verify_broker(service, name)
    new_list.push({ name: name, uri: uri, data_sources: sources }) if uri
  end

  if new_list.empty?
    # don't show services w/o active brokers
    unregister_service(service)
  else
    register_service(service, new_list)
  end
end

#check_servicesObject



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/druid/zk.rb', line 59

def check_services
  $log.debug("druid.zk checking services") if $log
  zk_services = @zk.children(@discovery_path, watch: true)

  (services - zk_services).each do |service|
    unregister_service(service)
  end

  zk_services.each do |service|
    check_service(service)
  end
end

#close!Object



25
26
27
28
# File 'lib/druid/zk.rb', line 25

def close!
  $log.debug("druid.zk shutting down") if $log
  @zk.close!
end

#data_sourcesObject



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/druid/zk.rb', line 119

def data_sources
  result = Hash.new { |hash, key| hash[key] = [] }

  @registry.each do |service, brokers|
    brokers.each do |broker|
      broker[:data_sources].each do |data_source|
        result["#{service}/#{data_source}"] << broker[:uri]
      end
    end
  end

  result.each do |source, uris|
    result[source] = uris.sample if uris.respond_to?(:sample)
  end

  result
end

#registerObject



15
16
17
18
19
20
21
22
23
# File 'lib/druid/zk.rb', line 15

def register
  $log.debug("druid.zk register discovery path") if $log
  @zk.on_expired_session { register }
  @zk.register(@discovery_path, only: :child) do |event|
    $log.debug("druid.zk got event on discovery path") if $log
    check_services
  end
  check_services
end

#register_service(service, brokers) ⇒ Object



30
31
32
33
34
# File 'lib/druid/zk.rb', line 30

def register_service(service, brokers)
  $log.debug("druid.zk register", service: service, brokers: brokers) if $log
  # poor mans load balancing
  @registry[service] = brokers.shuffle
end

#servicesObject



115
116
117
# File 'lib/druid/zk.rb', line 115

def services
  @registry.keys
end

#to_sObject



137
138
139
# File 'lib/druid/zk.rb', line 137

def to_s
  @registry.to_s
end

#unregister_service(service) ⇒ Object



36
37
38
39
40
# File 'lib/druid/zk.rb', line 36

def unregister_service(service)
  $log.debug("druid.zk unregister", service: service) if $log
  @registry.delete(service)
  unwatch_service(service)
end

#unwatch_service(service) ⇒ Object



53
54
55
56
57
# File 'lib/druid/zk.rb', line 53

def unwatch_service(service)
  return unless @watched_services.include?(service)
  $log.debug("druid.zk unwatch", service: service) if $log
  @watched_services.delete(service).unregister
end

#verify_broker(service, name) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/druid/zk.rb', line 72

def verify_broker(service, name)
  $log.debug("druid.zk verify", broker: name, service: service) if $log
  info = @zk.get("#{watch_path(service)}/#{name}")
  node = MultiJson.load(info[0])
  uri = "http://#{node['address']}:#{node['port']}/druid/v2/"
  check = RestClient::Request.execute({
    method: :get, url: "#{uri}datasources/",
    timeout: 5, open_timeout: 5
  })
  $log.debug("druid.zk verified", uri: uri, sources: check) if $log
  return [uri, MultiJson.load(check.to_str)] if check.code == 200
rescue
  return false
end

#watch_path(service) ⇒ Object



87
88
89
# File 'lib/druid/zk.rb', line 87

def watch_path(service)
  "#{@discovery_path}/#{service}"
end

#watch_service(service) ⇒ Object



42
43
44
45
46
47
48
49
50
51
# File 'lib/druid/zk.rb', line 42

def watch_service(service)
  return if @watched_services.include?(service)
  $log.debug("druid.zk watch", service: service) if $log
  watch = @zk.register(watch_path(service), only: :child) do |event|
    $log.debug("druid.zk got event on watch path for", service: service, event: event) if $log
    unwatch_service(service)
    check_service(service)
  end
  @watched_services[service] = watch
end