Class: Ktl::ZookeeperClient

Inherits:
Object
  • Object
show all
Defined in:
lib/ktl/zookeeper_client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri, options = {}) ⇒ ZookeeperClient

Returns a new instance of ZookeeperClient.



7
8
9
10
11
# File 'lib/ktl/zookeeper_client.rb', line 7

def initialize(uri, options={})
  @uri = uri
  @threadpool = options[:threadpool] || JavaConcurrent::Executors.new_fixed_thread_pool(CONCURRENCY)
  @utils = options[:utils] || Kafka::Utils::ZkUtils.apply(@uri, 5000, 5000, false)
end

Instance Attribute Details

#utilsObject (readonly)

Returns the value of attribute utils.



5
6
7
# File 'lib/ktl/zookeeper_client.rb', line 5

def utils
  @utils
end

Instance Method Details

#all_partitionsObject



27
28
29
# File 'lib/ktl/zookeeper_client.rb', line 27

def all_partitions
  @utils.get_all_partitions
end

#all_topicsObject



31
32
33
# File 'lib/ktl/zookeeper_client.rb', line 31

def all_topics
  @utils.get_all_topics
end

#broker_idsObject



39
40
41
# File 'lib/ktl/zookeeper_client.rb', line 39

def broker_ids
  @utils.get_sorted_broker_list
end

#brokersObject



35
36
37
# File 'lib/ktl/zookeeper_client.rb', line 35

def brokers
  @utils.get_all_brokers_in_cluster
end

#closeObject



18
19
20
21
# File 'lib/ktl/zookeeper_client.rb', line 18

def close
  @threadpool.shutdown_now if @threadpool
  @utils.close
end

#create_znode(path, data = '') ⇒ Object



63
64
65
# File 'lib/ktl/zookeeper_client.rb', line 63

def create_znode(path, data='')
  @utils.create_persistent_path(path, data, no_acl)
end

#delete_znode(path, options = {}) ⇒ Object



67
68
69
70
71
72
73
# File 'lib/ktl/zookeeper_client.rb', line 67

def delete_znode(path, options={})
  if options[:recursive]
    @utils.delete_path_recursive(path)
  else
    @utils.delete_path(path)
  end
end

#exists?(path) ⇒ Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/ktl/zookeeper_client.rb', line 83

def exists?(path)
  @utils.path_exists(path)
end

#get_children(path) ⇒ Object



79
80
81
# File 'lib/ktl/zookeeper_client.rb', line 79

def get_children(path)
  @utils.get_children(path)
end

#leader_and_isr_for(partitions) ⇒ Object



43
44
45
# File 'lib/ktl/zookeeper_client.rb', line 43

def leader_and_isr_for(partitions)
  @utils.get_partition_leader_and_isr_for_topics(@utils.class.create_zk_client(@uri, 5_000, 5_000), partitions)
end

#partitions_being_reassignedObject



55
56
57
# File 'lib/ktl/zookeeper_client.rb', line 55

def partitions_being_reassigned
  @utils.get_partitions_being_reassigned
end

#partitions_for_topics(topics) ⇒ Object



47
48
49
# File 'lib/ktl/zookeeper_client.rb', line 47

def partitions_for_topics(topics)
  request(:get_partitions_for_topics, topics)
end

#raw_clientObject



23
24
25
# File 'lib/ktl/zookeeper_client.rb', line 23

def raw_client
  @utils
end

#read_data(path) ⇒ Object



75
76
77
# File 'lib/ktl/zookeeper_client.rb', line 75

def read_data(path)
  @utils.read_data(path)
end

#reassign_partitions(json) ⇒ Object



59
60
61
# File 'lib/ktl/zookeeper_client.rb', line 59

def reassign_partitions(json)
  @utils.create_persistent_path(@utils.class.reassign_partitions_path, json, no_acl)
end

#replica_assignment_for_topics(topics) ⇒ Object



51
52
53
# File 'lib/ktl/zookeeper_client.rb', line 51

def replica_assignment_for_topics(topics)
  request(:get_replica_assignment_for_topics, topics)
end

#setupObject



13
14
15
16
# File 'lib/ktl/zookeeper_client.rb', line 13

def setup
  @submit = @threadpool.java_method(:submit, [java.lang.Class.for_name('java.util.concurrent.Callable')])
  self
end