Class: Druid::ZK
- Inherits:
 - 
      Object
      
        
- Object
 - Druid::ZK
 
 
- Defined in:
 - lib/druid/zk.rb
 
Instance Method Summary collapse
- #check_service(service) ⇒ Object
 - #check_services ⇒ Object
 - #close! ⇒ Object
 - #data_sources ⇒ Object
 - 
  
    
      #initialize(uri, opts = {})  ⇒ ZK 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    
A new instance of ZK.
 - #register ⇒ Object
 - #register_service(service, brokers) ⇒ Object
 - #services ⇒ Object
 - #to_s ⇒ Object
 - #unregister_service(service) ⇒ Object
 - #unwatch_service(service) ⇒ Object
 - #verify_broker(service, name) ⇒ Object
 - #watch_path(service) ⇒ Object
 - #watch_service(service) ⇒ Object
 
Constructor Details
#initialize(uri, opts = {}) ⇒ ZK
Returns a new instance of ZK.
      7 8 9 10 11 12 13  | 
    
      # File 'lib/druid/zk.rb', line 7 def initialize(uri, opts = {}) @zk = ::ZK.new(uri, chroot: :check) @registry = Hash.new { |hash, key| hash[key] = Array.new } @discovery_path = opts[:discovery_path] || '/discovery' @watched_services = Hash.new register end  | 
  
Instance Method Details
#check_service(service) ⇒ Object
      91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113  | 
    
      # File 'lib/druid/zk.rb', line 91 def check_service(service) return if @watched_services.include?(service) watch_service(service) known = @registry[service].map { |node| node[:name] } live = @zk.children(watch_path(service), watch: true) new_list = @registry[service].select { |node| live.include?(node[:name]) } $log.debug("druid.zk checking", service: service, known: known, live: live, new_list: new_list) if $log # verify the new entries to be living brokers (live - known).each do |name| uri, sources = verify_broker(service, name) new_list.push({ name: name, uri: uri, data_sources: sources }) if uri end if new_list.empty? # don't show services w/o active brokers unregister_service(service) else register_service(service, new_list) end end  | 
  
#check_services ⇒ Object
      59 60 61 62 63 64 65 66 67 68 69 70  | 
    
      # File 'lib/druid/zk.rb', line 59 def check_services $log.debug("druid.zk checking services") if $log zk_services = @zk.children(@discovery_path, watch: true) (services - zk_services).each do |service| unregister_service(service) end zk_services.each do |service| check_service(service) end end  | 
  
#close! ⇒ Object
      25 26 27 28  | 
    
      # File 'lib/druid/zk.rb', line 25 def close! $log.debug("druid.zk shutting down") if $log @zk.close! end  | 
  
#data_sources ⇒ Object
      119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135  | 
    
      # File 'lib/druid/zk.rb', line 119 def data_sources result = Hash.new { |hash, key| hash[key] = [] } @registry.each do |service, brokers| brokers.each do |broker| broker[:data_sources].each do |data_source| result["#{service}/#{data_source}"] << broker[:uri] end end end result.each do |source, uris| result[source] = uris.sample if uris.respond_to?(:sample) end result end  | 
  
#register ⇒ Object
      15 16 17 18 19 20 21 22 23  | 
    
      # File 'lib/druid/zk.rb', line 15 def register $log.debug("druid.zk register discovery path") if $log @zk.on_expired_session { register } @zk.register(@discovery_path, only: :child) do |event| $log.debug("druid.zk got event on discovery path") if $log check_services end check_services end  | 
  
#register_service(service, brokers) ⇒ Object
      30 31 32 33 34  | 
    
      # File 'lib/druid/zk.rb', line 30 def register_service(service, brokers) $log.debug("druid.zk register", service: service, brokers: brokers) if $log # poor mans load balancing @registry[service] = brokers.shuffle end  | 
  
#services ⇒ Object
      115 116 117  | 
    
      # File 'lib/druid/zk.rb', line 115 def services @registry.keys end  | 
  
#to_s ⇒ Object
      137 138 139  | 
    
      # File 'lib/druid/zk.rb', line 137 def to_s @registry.to_s end  | 
  
#unregister_service(service) ⇒ Object
      36 37 38 39 40  | 
    
      # File 'lib/druid/zk.rb', line 36 def unregister_service(service) $log.debug("druid.zk unregister", service: service) if $log @registry.delete(service) unwatch_service(service) end  | 
  
#unwatch_service(service) ⇒ Object
      53 54 55 56 57  | 
    
      # File 'lib/druid/zk.rb', line 53 def unwatch_service(service) return unless @watched_services.include?(service) $log.debug("druid.zk unwatch", service: service) if $log @watched_services.delete(service).unregister end  | 
  
#verify_broker(service, name) ⇒ Object
      72 73 74 75 76 77 78 79 80 81 82 83 84 85  | 
    
      # File 'lib/druid/zk.rb', line 72 def verify_broker(service, name) $log.debug("druid.zk verify", broker: name, service: service) if $log info = @zk.get("#{watch_path(service)}/#{name}") node = MultiJson.load(info[0]) uri = "http://#{node['address']}:#{node['port']}/druid/v2/" check = RestClient::Request.execute({ method: :get, url: "#{uri}datasources/", timeout: 5, open_timeout: 5 }) $log.debug("druid.zk verified", uri: uri, sources: check) if $log return [uri, MultiJson.load(check.to_str)] if check.code == 200 rescue return false end  | 
  
#watch_path(service) ⇒ Object
      87 88 89  | 
    
      # File 'lib/druid/zk.rb', line 87 def watch_path(service) "#{@discovery_path}/#{service}" end  | 
  
#watch_service(service) ⇒ Object
      42 43 44 45 46 47 48 49 50 51  | 
    
      # File 'lib/druid/zk.rb', line 42 def watch_service(service) return if @watched_services.include?(service) $log.debug("druid.zk watch", service: service) if $log watch = @zk.register(watch_path(service), only: :child) do |event| $log.debug("druid.zk got event on watch path for", service: service, event: event) if $log unwatch_service(service) check_service(service) end @watched_services[service] = watch end  |