Class: Kazoo::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/topic.rb

Constant Summary collapse

ALL_PRELOAD_METHODS =
[:partitions, :config].freeze
DEFAULT_PRELOAD_METHODS =
[:partitions].freeze
VALID_TOPIC_NAMES =
%r{\A[a-zA-Z0-9\\._\\-]+\z}
BLACKLISTED_TOPIC_NAMES =
%r{\A\.\.?\z}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster, name, config: nil, partitions: nil) ⇒ Topic

Returns a new instance of Topic.



10
11
12
13
14
15
# File 'lib/kazoo/topic.rb', line 10

def initialize(cluster, name, config: nil, partitions: nil)
  @cluster, @name = cluster, name

  self.partitions = partitions
  self.config = config
end

Instance Attribute Details

#clusterObject (readonly)

Returns the value of attribute cluster.



8
9
10
# File 'lib/kazoo/topic.rb', line 8

def cluster
  @cluster
end

#nameObject (readonly)

Returns the value of attribute name.



8
9
10
# File 'lib/kazoo/topic.rb', line 8

def name
  @name
end

Class Method Details

.create(cluster, name, partitions: nil, replication_factor: nil, config: {}) ⇒ Object

Raises:



250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/kazoo/topic.rb', line 250

def self.create(cluster, name, partitions: nil, replication_factor: nil, config: {})
  topic = new(cluster, name, config: config, partitions: [])
  raise Kazoo::Error, "Topic #{name} already exists" if topic.exists?

  replica_assigner = Kazoo::ReplicaAssigner.new(cluster)

  partitions.times do
    replicas = replica_assigner.assign(replication_factor)
    topic.append_partition(replicas: replicas)
  end

  topic.save
  topic
end

Instance Method Details

#add_partitions(partitions: nil, replication_factor: nil) ⇒ Object

Raises:

  • (ArgumentError)


78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/kazoo/topic.rb', line 78

def add_partitions(partitions: nil, replication_factor: nil)
  raise ArgumentError, "partitions must be a positive integer" if Integer(partitions) <= 0
  raise ArgumentError, "replication_factor must be a positive integer" if Integer(replication_factor) <= 0

  raise Kazoo::TopicNotFound, "The topic #{name} does not exists!" unless exists?

  replica_assigner = Kazoo::ReplicaAssigner.new(cluster)

  partitions.times do
    replicas = replica_assigner.assign(replication_factor)
    append_partition(replicas: replicas)
  end

  validate
  write_partitions_to_zookeeper
  wait_for_partitions
  cluster.
end

#append_partition(**kwargs) ⇒ Object



29
30
31
32
33
# File 'lib/kazoo/topic.rb', line 29

def append_partition(**kwargs)
  new_partition = partition(partitions.length, **kwargs)
  partitions << new_partition
  new_partition
end

#configObject



143
144
145
# File 'lib/kazoo/topic.rb', line 143

def config
  @config ||= load_config_from_zookeeper
end

#config=(hash) ⇒ Object



147
148
149
150
# File 'lib/kazoo/topic.rb', line 147

def config=(hash)
  return if hash.nil?
  @config = hash.inject({}) { |h, (k, v)| h[k.to_s] = v.to_s; h }
end

#delete_config(key) ⇒ Object



157
158
159
160
# File 'lib/kazoo/topic.rb', line 157

def delete_config(key)
  config.delete(key.to_s)
  write_config_to_zookeeper
end

#destroyObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/kazoo/topic.rb', line 108

def destroy
  t = Thread.current
  cb = Zookeeper::Callbacks::WatcherCallback.create do |event|
    case event.type
    when Zookeeper::Constants::ZOO_DELETED_EVENT
      t.run if t.status == 'sleep'
    else
      raise Kazoo::Error, "Unexpected Zookeeper event: #{event.type}"
    end
  end

  result = cluster.zk.stat(path: "/brokers/topics/#{name}", watcher: cb)
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    raise Kazoo::TopicNotFound, "Topic #{name} does not exist!"
  else
    raise Kazoo::Error, "Failed to monitor topic"
  end


  result = cluster.zk.create(path: "/admin/delete_topics/#{name}")
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    Thread.stop unless cb.completed?
  when Zookeeper::Constants::ZNODEEXISTS
    raise Kazoo::Error, "The topic #{name} is already marked for deletion!"
  else
    raise Kazoo::Error, "Failed to delete topic #{name}. Error code: #{result.fetch(:rc)}"
  end

  cluster.
end

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


47
48
49
# File 'lib/kazoo/topic.rb', line 47

def eql?(other)
  other.kind_of?(Kazoo::Topic) && cluster == other.cluster && name == other.name
end

#exists?Boolean

Returns:

  • (Boolean)


57
58
59
60
# File 'lib/kazoo/topic.rb', line 57

def exists?
  stat = cluster.zk.stat(path: "/brokers/topics/#{name}")
  stat.fetch(:stat).exists?
end

#hashObject



53
54
55
# File 'lib/kazoo/topic.rb', line 53

def hash
  [cluster, name].hash
end

#inspectObject



43
44
45
# File 'lib/kazoo/topic.rb', line 43

def inspect
  "#<Kazoo::Topic #{name}>"
end

#load_config_from_zookeeperObject



195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/kazoo/topic.rb', line 195

def load_config_from_zookeeper
  result = cluster.zk.get(path: "/config/topics/#{name}")
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    return {}
  else
    raise Kazoo::Error, "Failed to retrieve topic config"
  end

  set_config_from_json(result.fetch(:data)).config
end

#load_partitions_from_zookeeperObject

Raises:



189
190
191
192
193
# File 'lib/kazoo/topic.rb', line 189

def load_partitions_from_zookeeper
  result = cluster.zk.get(path: "/brokers/topics/#{name}")
  raise Kazoo::Error, "Failed to get list of partitions for #{name}. Result code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK
  set_partitions_from_json(result.fetch(:data)).partitions
end

#partition(index, **kwargs) ⇒ Object



25
26
27
# File 'lib/kazoo/topic.rb', line 25

def partition(index, **kwargs)
  Kazoo::Partition.new(self, index, **kwargs)
end

#partitionsObject



17
18
19
# File 'lib/kazoo/topic.rb', line 17

def partitions
  @partitions ||= load_partitions_from_zookeeper
end

#partitions=(ps) ⇒ Object



21
22
23
# File 'lib/kazoo/topic.rb', line 21

def partitions=(ps)
  @partitions = ps
end

#replication_factorObject



35
36
37
# File 'lib/kazoo/topic.rb', line 35

def replication_factor
  partitions.map(&:replication_factor).min
end

#reset_default_configObject



162
163
164
165
# File 'lib/kazoo/topic.rb', line 162

def reset_default_config
  @config = {}
  write_config_to_zookeeper
end

#saveObject

Raises:



97
98
99
100
101
102
103
104
105
106
# File 'lib/kazoo/topic.rb', line 97

def save
  raise Kazoo::Error, "The topic #{name} already exists!" if exists?
  validate

  write_config_to_zookeeper
  write_partitions_to_zookeeper

  wait_for_partitions
  cluster.
end

#set_config(key, value) ⇒ Object



152
153
154
155
# File 'lib/kazoo/topic.rb', line 152

def set_config(key, value)
  config[key.to_s] = value.to_s
  write_config_to_zookeeper
end

#set_config_from_json(json_payload) ⇒ Object



180
181
182
183
184
185
186
187
# File 'lib/kazoo/topic.rb', line 180

def set_config_from_json(json_payload)
  config_json = JSON.parse(json_payload)
  raise Kazoo::VersionNotSupported if config_json.fetch('version') != 1

  @config = config_json.fetch('config')

  self
end

#set_partitions_from_json(json_payload) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/kazoo/topic.rb', line 167

def set_partitions_from_json(json_payload)
  partition_json = JSON.parse(json_payload)
  raise Kazoo::VersionNotSupported if partition_json.fetch('version') != 1

  @partitions = partition_json.fetch('partitions').map do |(id, replicas)|
    partition(id.to_i, replicas: replicas.map { |b| cluster.brokers[b] })
  end

  @partitions.sort_by!(&:id)

  self
end

#under_replicated?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/kazoo/topic.rb', line 39

def under_replicated?
  partitions.any?(&:under_replicated?)
end

#valid?Boolean

Returns:

  • (Boolean)


72
73
74
75
76
# File 'lib/kazoo/topic.rb', line 72

def valid?
  validate
rescue Kazoo::ValidationError
  false
end

#validateObject



62
63
64
65
66
67
68
69
70
# File 'lib/kazoo/topic.rb', line 62

def validate
  raise Kazoo::ValidationError, "#{name} is not a valid topic name" if VALID_TOPIC_NAMES !~ name
  raise Kazoo::ValidationError, "#{name} is not a valid topic name" if BLACKLISTED_TOPIC_NAMES =~ name
  raise Kazoo::ValidationError, "#{name} is too long" if name.length > 255
  raise Kazoo::ValidationError, "The topic has no partitions defined" if partitions.length == 0
  partitions.each(&:validate)

  true
end

#write_config_to_zookeeperObject

Raises:



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/kazoo/topic.rb', line 227

def write_config_to_zookeeper
  config_hash = config.inject({}) { |h, (k,v)| h[k.to_s] = v.to_s; h }
  config_json = JSON.generate(version: 1, config: config_hash)

  # Set topic config
  result = cluster.zk.set(path: "/config/topics/#{name}", data: config_json)
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    result = cluster.zk.create(path: "/config/topics/#{name}", data: config_json)
    raise Kazoo::Error, "Failed to write topic config to zookeeper. Result code: #{result.fetch(:rc)}" unless result.fetch(:rc) == Zookeeper::Constants::ZOK
  else
    raise Kazoo::Error, "Failed to write topic config to zookeeper. Result code: #{result.fetch(:rc)}"
  end

  # Set config change notification
  result = cluster.zk.create(path: "/config/changes/config_change_", data: name.inspect, sequence: true)
  raise Kazoo::Error, "Failed to set topic config change notification" unless result.fetch(:rc) == Zookeeper::Constants::ZOK

  cluster.
end

#write_partitions_to_zookeeperObject



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/kazoo/topic.rb', line 209

def write_partitions_to_zookeeper
  path = "/brokers/topics/#{name}"
  data = JSON.generate(version: 1, partitions: partitions_as_json)

  result = cluster.zk.set(path: path, data: data)
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    result = cluster.zk.create(path: path, data: data)
    raise Kazoo::Error, "Failed to write partitions to zookeeper. Result code: #{result.fetch(:rc)}" unless result.fetch(:rc) == Zookeeper::Constants::ZOK
  else
    raise Kazoo::Error, "Failed to write partitions to zookeeper. Result code: #{result.fetch(:rc)}"
  end

  cluster.
end