Class: Kazoo::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/topic.rb

Constant Summary collapse

VALID_TOPIC_NAMES =
%r{\A[a-zA-Z0-9\\._\\-]+\z}
BLACKLISTED_TOPIC_NAMES =
%r{\A\.\.?\z}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster, name) ⇒ Topic

Returns a new instance of Topic.



9
10
11
# File 'lib/kazoo/topic.rb', line 9

def initialize(cluster, name)
  @cluster, @name = cluster, name
end

Instance Attribute Details

#clusterObject (readonly)

Returns the value of attribute cluster.



6
7
8
# File 'lib/kazoo/topic.rb', line 6

def cluster
  @cluster
end

#nameObject (readonly)

Returns the value of attribute name.



6
7
8
# File 'lib/kazoo/topic.rb', line 6

def name
  @name
end

#partitionsObject



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/kazoo/topic.rb', line 24

def partitions
  @partitions ||= begin
    result = cluster.zk.get(path: "/brokers/topics/#{name}")
    raise Kazoo::Error, "Failed to get list of partitions for #{name}. Result code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK

    partition_json = JSON.parse(result.fetch(:data))
    partition_json.fetch('partitions').map do |(id, replicas)|
      partition(id.to_i, replicas: replicas.map { |b| cluster.brokers[b] })
    end
  end
end

Class Method Details

.create(cluster, name, partitions: nil, replication_factor: nil) ⇒ Object



196
197
198
199
200
201
# File 'lib/kazoo/topic.rb', line 196

def self.create(cluster, name, partitions: nil, replication_factor: nil)
  topic = new(cluster, name)
  topic.send(:sequentially_assign_partitions, partitions, replication_factor)
  topic.create
  topic
end

.from_json(cluster, name, json) ⇒ Object



13
14
15
16
17
18
19
20
21
22
# File 'lib/kazoo/topic.rb', line 13

def self.from_json(cluster, name, json)
  raise Kazoo::VersionNotSupported unless json.fetch('version') == 1

  topic = new(cluster, name)
  topic.partitions = json.fetch('partitions').map do |(id, replicas)|
    topic.partition(id.to_i, replicas: replicas.map { |b| cluster.brokers[b] })
  end.sort_by(&:id)

  return topic
end

Instance Method Details

#configObject



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/kazoo/topic.rb', line 141

def config
  result = cluster.zk.get(path: "/config/topics/#{name}")
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    raise Kazoo::TopicNotFound, "Topic #{name} does not exist!"
  else
    raise Kazoo::Error, "Failed to set topic config"
  end

  config = JSON.parse(result.fetch(:data))
  raise Kazoo::VersionNotSupported if config.fetch('version') != 1

  config.fetch('config')
end

#createObject

Raises:



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/kazoo/topic.rb', line 83

def create
  raise Kazoo::Error, "The topic #{name} already exists!" if exists?
  validate

  result = cluster.zk.create(
    path: "/config/topics/#{name}",
    data: JSON.generate(version: 1, config: {})
  )

  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to create topic config node for #{name}. Error code: #{result.fetch(:rc)}"
  end

  result = cluster.zk.create(
    path: "/brokers/topics/#{name}",
    data: JSON.generate(version: 1, partitions: partitions_as_json)
  )

  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to create topic #{name}. Error code: #{result.fetch(:rc)}"
  end

  wait_for_partitions
end

#delete_config(key) ⇒ Object



164
165
166
167
168
# File 'lib/kazoo/topic.rb', line 164

def delete_config(key)
  new_config = config
  new_config.delete(key.to_s)
  write_config(new_config)
end

#destroyObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/kazoo/topic.rb', line 108

def destroy
  t = Thread.current
  cb = Zookeeper::Callbacks::WatcherCallback.create do |event|
    case event.type
    when Zookeeper::Constants::ZOO_DELETED_EVENT
      t.run if t.status == 'sleep'
    else
      raise Kazoo::Error, "Unexpected Zookeeper event: #{event.type}"
    end
  end

  result = cluster.zk.stat(path: "/brokers/topics/#{name}", watcher: cb)
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    raise Kazoo::TopicNotFound, "Topic #{name} does not exist!"
  else
    raise Kazoo::Error, "Failed to monitor topic"
  end


  result = cluster.zk.create(path: "/admin/delete_topics/#{name}")
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    Thread.stop unless cb.completed?
  when Zookeeper::Constants::ZNODEEXISTS
    raise Kazoo::Error, "The topic #{name} is already marked for deletion!"
  else
    raise Kazoo::Error, "Failed to delete topic #{name}. Error code: #{result.fetch(:rc)}"
  end
end

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


52
53
54
# File 'lib/kazoo/topic.rb', line 52

def eql?(other)
  other.kind_of?(Kazoo::Topic) && cluster == other.cluster && name == other.name
end

#exists?Boolean

Returns:

  • (Boolean)


62
63
64
65
# File 'lib/kazoo/topic.rb', line 62

def exists?
  stat = cluster.zk.stat(path: "/brokers/topics/#{name}")
  stat.fetch(:stat).exists?
end

#hashObject



58
59
60
# File 'lib/kazoo/topic.rb', line 58

def hash
  [cluster, name].hash
end

#inspectObject



48
49
50
# File 'lib/kazoo/topic.rb', line 48

def inspect
  "#<Kazoo::Topic #{name}>"
end

#partition(*args) ⇒ Object



36
37
38
# File 'lib/kazoo/topic.rb', line 36

def partition(*args)
  Kazoo::Partition.new(self, *args)
end

#replication_factorObject



40
41
42
# File 'lib/kazoo/topic.rb', line 40

def replication_factor
  partitions.map(&:replication_factor).min
end

#reset_default_configObject



170
171
172
# File 'lib/kazoo/topic.rb', line 170

def reset_default_config
  write_config({})
end

#set_config(key, value) ⇒ Object



158
159
160
161
162
# File 'lib/kazoo/topic.rb', line 158

def set_config(key, value)
  new_config = config
  new_config[key.to_s] = value.to_s
  write_config(new_config)
end

#under_replicated?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/kazoo/topic.rb', line 44

def under_replicated?
  partitions.any?(:under_replicated?)
end

#valid?Boolean

Returns:

  • (Boolean)


77
78
79
80
81
# File 'lib/kazoo/topic.rb', line 77

def valid?
  validate
rescue Kazoo::ValidationError
  false
end

#validateObject



67
68
69
70
71
72
73
74
75
# File 'lib/kazoo/topic.rb', line 67

def validate
  raise Kazoo::ValidationError, "#{name} is not a valid topic name" if VALID_TOPIC_NAMES !~ name
  raise Kazoo::ValidationError, "#{name} is not a valid topic name" if BLACKLISTED_TOPIC_NAMES =~ name
  raise Kazoo::ValidationError, "#{name} is too long" if name.length > 255
  raise Kazoo::ValidationError, "The topic has no partitions defined" if partitions.length == 0
  partitions.each(&:validate)

  true
end

#write_config(config_hash) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/kazoo/topic.rb', line 174

def write_config(config_hash)
  raise Kazoo::TopicNotFound, "Topic #{name.inspect} does not exist" unless exists?

  config = config_hash.inject({}) { |h, (k,v)| h[k.to_s] = v.to_s; h }
  config_json = JSON.generate(version: 1, config: config)

  # Set topic config
  result = cluster.zk.set(path: "/config/topics/#{name}", data: config_json)
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    raise Kazoo::TopicNotFound, "Topic #{name} does not exist!"
  else
    raise Kazoo::Error, "Failed to set topic config"
  end

  # Set config change notification
  result = cluster.zk.create(path: "/config/changes/config_change_", data: name.inspect, sequence: true)
  raise Kazoo::Error, "Failed to set topic config change notification" unless result.fetch(:rc) == Zookeeper::Constants::ZOK
end