Class: Kazoo::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/topic.rb

Constant Summary collapse

VALID_TOPIC_NAMES =
%r{\A[a-zA-Z0-9\\._\\-]+\z}
BLACKLISTED_TOPIC_NAMES =
%r{\A\.\.?\z}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster, name) ⇒ Topic

Returns a new instance of Topic.



9
10
11
# File 'lib/kazoo/topic.rb', line 9

def initialize(cluster, name)
  @cluster, @name = cluster, name
end

Instance Attribute Details

#clusterObject (readonly)

Returns the value of attribute cluster.



6
7
8
# File 'lib/kazoo/topic.rb', line 6

def cluster
  @cluster
end

#nameObject (readonly)

Returns the value of attribute name.



6
7
8
# File 'lib/kazoo/topic.rb', line 6

def name
  @name
end

#partitionsObject



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/kazoo/topic.rb', line 22

def partitions
  @partitions ||= begin
    result = cluster.zk.get(path: "/brokers/topics/#{name}")
    raise Kazoo::Error, "Failed to get list of partitions for #{name}. Result code: #{result.fetch(:rc)}" if result.fetch(:rc) != Zookeeper::Constants::ZOK

    partition_json = JSON.parse(result.fetch(:data))
    partition_json.fetch('partitions').map do |(id, replicas)|
      partition(id.to_i, replicas: replicas.map { |b| cluster.brokers[b] })
    end
  end
end

Class Method Details

.create(cluster, name, partitions: nil, replication_factor: nil) ⇒ Object



130
131
132
133
134
135
# File 'lib/kazoo/topic.rb', line 130

def self.create(cluster, name, partitions: nil, replication_factor: nil)
  topic = new(cluster, name)
  topic.send(:sequentially_assign_partitions, partitions, replication_factor)
  topic.create
  topic
end

.from_json(cluster, name, json) ⇒ Object



13
14
15
16
17
18
19
20
# File 'lib/kazoo/topic.rb', line 13

def self.from_json(cluster, name, json)
  topic = new(cluster, name)
  topic.partitions = json.fetch('partitions').map do |(id, replicas)|
    topic.partition(id.to_i, replicas: replicas.map { |b| cluster.brokers[b] })
  end.sort_by(&:id)

  return topic
end

Instance Method Details

#createObject

Raises:



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/kazoo/topic.rb', line 81

def create
  raise Kazoo::Error, "The topic #{name} already exists!" if exists?
  validate

  result = cluster.zk.create(
    path: "/brokers/topics/#{name}",
    data: JSON.dump(version: 1, partitions: partitions_as_json)
  )

  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to create topic #{name}. Error code: #{result.fetch(:rc)}"
  end

  wait_for_partitions
end

#destroyObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/kazoo/topic.rb', line 97

def destroy
  t = Thread.current
  cb = Zookeeper::Callbacks::WatcherCallback.create do |event|
    case event.type
    when Zookeeper::Constants::ZOO_DELETED_EVENT
      t.run if t.status == 'sleep'
    else
      raise Kazoo::Error, "Unexpected Zookeeper event: #{event.type}"
    end
  end

  result = cluster.zk.stat(path: "/brokers/topics/#{name}", watcher: cb)
  case result.fetch(:rc)
   when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    raise Kazoo::Error, "Topic #{name} does not exist!"
  else
    raise Kazoo::Error, "Failed to monitor topic"
  end


  result = cluster.zk.create(path: "/admin/delete_topics/#{name}")
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    Thread.stop unless cb.completed?
  when Zookeeper::Constants::ZNODEEXISTS
    raise Kazoo::Error, "The topic #{name} is already marked for deletion!"
  else
    raise Kazoo::Error, "Failed to delete topic #{name}. Error code: #{result.fetch(:rc)}"
  end
end

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


50
51
52
# File 'lib/kazoo/topic.rb', line 50

def eql?(other)
  other.kind_of?(Kazoo::Topic) && cluster == other.cluster && name == other.name
end

#exists?Boolean

Returns:

  • (Boolean)


60
61
62
63
# File 'lib/kazoo/topic.rb', line 60

def exists?
  stat = cluster.zk.stat(path: "/brokers/topics/#{name}")
  stat.fetch(:stat).exists?
end

#hashObject



56
57
58
# File 'lib/kazoo/topic.rb', line 56

def hash
  [cluster, name].hash
end

#inspectObject



46
47
48
# File 'lib/kazoo/topic.rb', line 46

def inspect
  "#<Kazoo::Topic #{name}>"
end

#partition(*args) ⇒ Object



34
35
36
# File 'lib/kazoo/topic.rb', line 34

def partition(*args)
  Kazoo::Partition.new(self, *args)
end

#replication_factorObject



38
39
40
# File 'lib/kazoo/topic.rb', line 38

def replication_factor
  partitions.map(&:replication_factor).min
end

#under_replicated?Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/kazoo/topic.rb', line 42

def under_replicated?
  partitions.any?(:under_replicated?)
end

#valid?Boolean

Returns:

  • (Boolean)


75
76
77
78
79
# File 'lib/kazoo/topic.rb', line 75

def valid?
  validate
rescue Kazoo::ValidationError
  false
end

#validateObject



65
66
67
68
69
70
71
72
73
# File 'lib/kazoo/topic.rb', line 65

def validate
  raise Kazoo::ValidationError, "#{name} is not a valid topic name" if VALID_TOPIC_NAMES !~ name
  raise Kazoo::ValidationError, "#{name} is not a valid topic name" if BLACKLISTED_TOPIC_NAMES =~ name
  raise Kazoo::ValidationError, "#{name} is too long" if name.length > 255
  raise Kazoo::ValidationError, "The topic has no partitions defined" if partitions.length == 0
  partitions.each(&:validate)

  true
end