Class: Nerve::Reporter::Zookeeper

Inherits:
Base
  • Object
show all
Defined in:
lib/nerve/reporter/zookeeper.rb

Constant Summary collapse

@@zk_pool =
{}
@@zk_pool_count =
{}
@@zk_pool_lock =
Mutex.new

Instance Method Summary collapse

Methods inherited from Base

#get_service_data

Methods included from Logging

configure_logger_for, #log, logger_for

Methods included from Utils

#responsive_sleep, #safe_run

Constructor Details

#initialize(service) ⇒ Zookeeper

Returns a new instance of Zookeeper.



12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/nerve/reporter/zookeeper.rb', line 12

def initialize(service)
  %w{zk_hosts zk_path}.each do |required|
    raise ArgumentError, "missing required argument #{required} for new service watcher" unless service[required]
  end
  # Since we pool we get one connection per zookeeper cluster
  @zk_connection_string = service['zk_hosts'].sort.join(',')
  @data = parse_data(get_service_data(service))

  @zk_path = service['zk_path']
  @key_prefix = @zk_path + "/#{service['instance_id']}_"
  @full_key = nil
end

Instance Method Details

#ping?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/nerve/reporter/zookeeper.rb', line 72

def ping?
  return @zk.connected? && @zk.exists?(@full_key || '/')
end

#report_downObject



68
69
70
# File 'lib/nerve/reporter/zookeeper.rb', line 68

def report_down
  zk_delete
end

#report_upObject



64
65
66
# File 'lib/nerve/reporter/zookeeper.rb', line 64

def report_up()
  zk_save
end

#startObject



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/nerve/reporter/zookeeper.rb', line 25

def start()
  log.info "nerve: waiting to connect to zookeeper to #{@zk_connection_string}"
  # Ensure that all Zookeeper reporters re-use a single zookeeper
  # connection to any given set of zk hosts.
  @@zk_pool_lock.synchronize {
    unless @@zk_pool.has_key?(@zk_connection_string)
      log.info "nerve: creating pooled connection to #{@zk_connection_string}"
      @@zk_pool[@zk_connection_string] = ZK.new(@zk_connection_string, :timeout => 5)
      @@zk_pool_count[@zk_connection_string] = 1
      log.info "nerve: successfully created zk connection to #{@zk_connection_string}"
    else
      @@zk_pool_count[@zk_connection_string] += 1
      log.info "nerve: re-using existing zookeeper connection to #{@zk_connection_string}"
    end
    @zk = @@zk_pool[@zk_connection_string]
    log.info "nerve: retrieved zk connection to #{@zk_connection_string}"
  }
end

#stopObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/nerve/reporter/zookeeper.rb', line 44

def stop()
  log.info "nerve: removing zk node at #{@full_key}" if @full_key
  begin
    report_down
  ensure
    @@zk_pool_lock.synchronize {
      @@zk_pool_count[@zk_connection_string] -= 1
      # Last thread to use the connection closes it
      if @@zk_pool_count[@zk_connection_string] == 0
        log.info "nerve: closing zk connection to #{@zk_connection_string}"
        begin
          @zk.close!
        ensure
          @@zk_pool.delete(@zk_connection_string)
        end
      end
    }
  end
end