Class: Kazoo::Topic
- Inherits:
-
Object
- Object
- Kazoo::Topic
- Defined in:
- lib/kazoo/topic.rb
Constant Summary collapse
- ALL_PRELOAD_METHODS =
[:partitions, :config].freeze
- DEFAULT_PRELOAD_METHODS =
[:partitions].freeze
- VALID_TOPIC_NAMES =
%r{\A[a-zA-Z0-9\\._\\-]+\z}
- BLACKLISTED_TOPIC_NAMES =
%r{\A\.\.?\z}
Instance Attribute Summary collapse
-
#cluster ⇒ Object
readonly
Returns the value of attribute cluster.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
Instance Method Summary collapse
- #add_partitions(partitions: nil, replication_factor: nil) ⇒ Object
- #append_partition(**kwargs) ⇒ Object
- #config ⇒ Object
- #config=(hash) ⇒ Object
- #delete_config(key) ⇒ Object
- #destroy ⇒ Object
- #eql?(other) ⇒ Boolean (also: #==)
- #exists? ⇒ Boolean
- #hash ⇒ Object
-
#initialize(cluster, name, config: nil, partitions: nil) ⇒ Topic
constructor
A new instance of Topic.
- #inspect ⇒ Object
- #load_config_from_zookeeper ⇒ Object
- #load_partitions_from_zookeeper ⇒ Object
- #partition(index, **kwargs) ⇒ Object
- #partitions ⇒ Object
- #partitions=(ps) ⇒ Object
- #replication_factor ⇒ Object
- #reset_default_config ⇒ Object
- #save ⇒ Object
- #set_config(key, value) ⇒ Object
- #set_config_from_json(json_payload) ⇒ Object
- #set_partitions_from_json(json_payload) ⇒ Object
- #under_replicated? ⇒ Boolean
- #valid? ⇒ Boolean
- #validate ⇒ Object
- #write_config_to_zookeeper ⇒ Object
- #write_partitions_to_zookeeper ⇒ Object
Constructor Details
#initialize(cluster, name, config: nil, partitions: nil) ⇒ Topic
Returns a new instance of Topic.
10 11 12 13 14 15 |
# File 'lib/kazoo/topic.rb', line 10 def initialize(cluster, name, config: nil, partitions: nil) @cluster, @name = cluster, name self.partitions = partitions self.config = config end |
Instance Attribute Details
#cluster ⇒ Object (readonly)
Returns the value of attribute cluster.
8 9 10 |
# File 'lib/kazoo/topic.rb', line 8 def cluster @cluster end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
8 9 10 |
# File 'lib/kazoo/topic.rb', line 8 def name @name end |
Class Method Details
.create(cluster, name, partitions: nil, replication_factor: nil, config: {}) ⇒ Object
250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/kazoo/topic.rb', line 250 def self.create(cluster, name, partitions: nil, replication_factor: nil, config: {}) topic = new(cluster, name, config: config, partitions: []) raise Kazoo::Error, "Topic #{name} already exists" if topic.exists? replica_assigner = Kazoo::ReplicaAssigner.new(cluster) partitions.times do replicas = replica_assigner.assign(replication_factor) topic.append_partition(replicas: replicas) end topic.save topic end |
Instance Method Details
#add_partitions(partitions: nil, replication_factor: nil) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/kazoo/topic.rb', line 78 def add_partitions(partitions: nil, replication_factor: nil) raise ArgumentError, "partitions must be a positive integer" if Integer(partitions) <= 0 raise ArgumentError, "replication_factor must be a positive integer" if Integer(replication_factor) <= 0 raise Kazoo::TopicNotFound, "The topic #{name} does not exists!" unless exists? replica_assigner = Kazoo::ReplicaAssigner.new(cluster) partitions.times do replicas = replica_assigner.assign(replication_factor) append_partition(replicas: replicas) end validate write_partitions_to_zookeeper wait_for_partitions cluster. end |
#append_partition(**kwargs) ⇒ Object
29 30 31 32 33 |
# File 'lib/kazoo/topic.rb', line 29 def append_partition(**kwargs) new_partition = partition(partitions.length, **kwargs) partitions << new_partition new_partition end |
#config ⇒ Object
143 144 145 |
# File 'lib/kazoo/topic.rb', line 143 def config @config ||= load_config_from_zookeeper end |
#config=(hash) ⇒ Object
147 148 149 150 |
# File 'lib/kazoo/topic.rb', line 147 def config=(hash) return if hash.nil? @config = hash.inject({}) { |h, (k, v)| h[k.to_s] = v.to_s; h } end |
#delete_config(key) ⇒ Object
157 158 159 160 |
# File 'lib/kazoo/topic.rb', line 157 def delete_config(key) config.delete(key.to_s) write_config_to_zookeeper end |
#destroy ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/kazoo/topic.rb', line 108 def destroy t = Thread.current cb = Zookeeper::Callbacks::WatcherCallback.create do |event| case event.type when Zookeeper::Constants::ZOO_DELETED_EVENT t.run if t.status == 'sleep' else raise Kazoo::Error, "Unexpected Zookeeper event: #{event.type}" end end result = cluster.zk.stat(path: "/brokers/topics/#{name}", watcher: cb) case result.fetch(:rc) when Zookeeper::Constants::ZOK # continue when Zookeeper::Constants::ZNONODE raise Kazoo::TopicNotFound, "Topic #{name} does not exist!" else raise Kazoo::Error, "Failed to monitor topic" end result = cluster.zk.create(path: "/admin/delete_topics/#{name}") case result.fetch(:rc) when Zookeeper::Constants::ZOK Thread.stop unless cb.completed? when Zookeeper::Constants::ZNODEEXISTS raise Kazoo::Error, "The topic #{name} is already marked for deletion!" else raise Kazoo::Error, "Failed to delete topic #{name}. Error code: #{result.fetch(:rc)}" end cluster. end |
#eql?(other) ⇒ Boolean Also known as: ==
47 48 49 |
# File 'lib/kazoo/topic.rb', line 47 def eql?(other) other.kind_of?(Kazoo::Topic) && cluster == other.cluster && name == other.name end |
#exists? ⇒ Boolean
57 58 59 60 |
# File 'lib/kazoo/topic.rb', line 57 def exists? stat = cluster.zk.stat(path: "/brokers/topics/#{name}") stat.fetch(:stat).exists? end |
#hash ⇒ Object
53 54 55 |
# File 'lib/kazoo/topic.rb', line 53 def hash [cluster, name].hash end |
#inspect ⇒ Object
43 44 45 |
# File 'lib/kazoo/topic.rb', line 43 def inspect "#<Kazoo::Topic #{name}>" end |
#load_config_from_zookeeper ⇒ Object
195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/kazoo/topic.rb', line 195 def load_config_from_zookeeper result = cluster.zk.get(path: "/config/topics/#{name}") case result.fetch(:rc) when Zookeeper::Constants::ZOK # continue when Zookeeper::Constants::ZNONODE return {} else raise Kazoo::Error, "Failed to retrieve topic config" end set_config_from_json(result.fetch(:data)).config end |
#load_partitions_from_zookeeper ⇒ Object
189 190 191 192 193 |
# File 'lib/kazoo/topic.rb', line 189 def load_partitions_from_zookeeper result = cluster.zk.get(path: "/brokers/topics/#{name}") raise Kazoo::Error, "Failed to get list of partitions for #{name}. Result code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK set_partitions_from_json(result.fetch(:data)).partitions end |
#partition(index, **kwargs) ⇒ Object
25 26 27 |
# File 'lib/kazoo/topic.rb', line 25 def partition(index, **kwargs) Kazoo::Partition.new(self, index, **kwargs) end |
#partitions ⇒ Object
17 18 19 |
# File 'lib/kazoo/topic.rb', line 17 def partitions @partitions ||= load_partitions_from_zookeeper end |
#partitions=(ps) ⇒ Object
21 22 23 |
# File 'lib/kazoo/topic.rb', line 21 def partitions=(ps) @partitions = ps end |
#replication_factor ⇒ Object
35 36 37 |
# File 'lib/kazoo/topic.rb', line 35 def replication_factor partitions.map(&:replication_factor).min end |
#reset_default_config ⇒ Object
162 163 164 165 |
# File 'lib/kazoo/topic.rb', line 162 def reset_default_config @config = {} write_config_to_zookeeper end |
#save ⇒ Object
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/kazoo/topic.rb', line 97 def save raise Kazoo::Error, "The topic #{name} already exists!" if exists? validate write_config_to_zookeeper write_partitions_to_zookeeper wait_for_partitions cluster. end |
#set_config(key, value) ⇒ Object
152 153 154 155 |
# File 'lib/kazoo/topic.rb', line 152 def set_config(key, value) config[key.to_s] = value.to_s write_config_to_zookeeper end |
#set_config_from_json(json_payload) ⇒ Object
180 181 182 183 184 185 186 187 |
# File 'lib/kazoo/topic.rb', line 180 def set_config_from_json(json_payload) config_json = JSON.parse(json_payload) raise Kazoo::VersionNotSupported if config_json.fetch('version') != 1 @config = config_json.fetch('config') self end |
#set_partitions_from_json(json_payload) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/kazoo/topic.rb', line 167 def set_partitions_from_json(json_payload) partition_json = JSON.parse(json_payload) raise Kazoo::VersionNotSupported if partition_json.fetch('version') != 1 @partitions = partition_json.fetch('partitions').map do |(id, replicas)| partition(id.to_i, replicas: replicas.map { |b| cluster.brokers[b] }) end @partitions.sort_by!(&:id) self end |
#under_replicated? ⇒ Boolean
39 40 41 |
# File 'lib/kazoo/topic.rb', line 39 def under_replicated? partitions.any?(&:under_replicated?) end |
#valid? ⇒ Boolean
72 73 74 75 76 |
# File 'lib/kazoo/topic.rb', line 72 def valid? validate rescue Kazoo::ValidationError false end |
#validate ⇒ Object
62 63 64 65 66 67 68 69 70 |
# File 'lib/kazoo/topic.rb', line 62 def validate raise Kazoo::ValidationError, "#{name} is not a valid topic name" if VALID_TOPIC_NAMES !~ name raise Kazoo::ValidationError, "#{name} is not a valid topic name" if BLACKLISTED_TOPIC_NAMES =~ name raise Kazoo::ValidationError, "#{name} is too long" if name.length > 255 raise Kazoo::ValidationError, "The topic has no partitions defined" if partitions.length == 0 partitions.each(&:validate) true end |
#write_config_to_zookeeper ⇒ Object
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/kazoo/topic.rb', line 227 def write_config_to_zookeeper config_hash = config.inject({}) { |h, (k,v)| h[k.to_s] = v.to_s; h } config_json = JSON.generate(version: 1, config: config_hash) # Set topic config result = cluster.zk.set(path: "/config/topics/#{name}", data: config_json) case result.fetch(:rc) when Zookeeper::Constants::ZOK # continue when Zookeeper::Constants::ZNONODE result = cluster.zk.create(path: "/config/topics/#{name}", data: config_json) raise Kazoo::Error, "Failed to write topic config to zookeeper. Result code: #{result.fetch(:rc)}" unless result.fetch(:rc) == Zookeeper::Constants::ZOK else raise Kazoo::Error, "Failed to write topic config to zookeeper. Result code: #{result.fetch(:rc)}" end # Set config change notification result = cluster.zk.create(path: "/config/changes/config_change_", data: name.inspect, sequence: true) raise Kazoo::Error, "Failed to set topic config change notification" unless result.fetch(:rc) == Zookeeper::Constants::ZOK cluster. end |
#write_partitions_to_zookeeper ⇒ Object
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/kazoo/topic.rb', line 209 def write_partitions_to_zookeeper path = "/brokers/topics/#{name}" data = JSON.generate(version: 1, partitions: partitions_as_json) result = cluster.zk.set(path: path, data: data) case result.fetch(:rc) when Zookeeper::Constants::ZOK # continue when Zookeeper::Constants::ZNONODE result = cluster.zk.create(path: path, data: data) raise Kazoo::Error, "Failed to write partitions to zookeeper. Result code: #{result.fetch(:rc)}" unless result.fetch(:rc) == Zookeeper::Constants::ZOK else raise Kazoo::Error, "Failed to write partitions to zookeeper. Result code: #{result.fetch(:rc)}" end cluster. end |