Class: Kazoo::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/topic.rb

Constant Summary collapse

VALID_TOPIC_NAMES =
%r{\A[a-zA-Z0-9\\._\\-]+\z}
BLACKLISTED_TOPIC_NAMES =
%r{\A\.\.?\z}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster, name) ⇒ Topic

Returns a new instance of Topic.



9
10
11
12
# File 'lib/kazoo/topic.rb', line 9

def initialize(cluster, name)
  @cluster, @name = cluster, name
  @partitions = []
end

Instance Attribute Details

#clusterObject (readonly)

Returns the value of attribute cluster.



6
7
8
# File 'lib/kazoo/topic.rb', line 6

def cluster
  @cluster
end

#nameObject (readonly)

Returns the value of attribute name.



6
7
8
# File 'lib/kazoo/topic.rb', line 6

def name
  @name
end

#partitionsObject

Returns the value of attribute partitions.



7
8
9
# File 'lib/kazoo/topic.rb', line 7

def partitions
  @partitions
end

Class Method Details

.create(cluster, name, partitions: nil, replication_factor: nil) ⇒ Object



119
120
121
122
123
124
# File 'lib/kazoo/topic.rb', line 119

def self.create(cluster, name, partitions: nil, replication_factor: nil)
  topic = new(cluster, name)
  topic.send(:sequentially_assign_partitions, partitions, replication_factor)
  topic.create
  topic
end

.from_json(cluster, name, json) ⇒ Object



14
15
16
17
18
19
20
21
# File 'lib/kazoo/topic.rb', line 14

def self.from_json(cluster, name, json)
  topic = new(cluster, name)
  topic.partitions = json.fetch('partitions').map do |(id, replicas)|
    topic.partition(id.to_i, replicas: replicas.map { |b| cluster.brokers[b] })
  end.sort_by(&:id)

  return topic
end

Instance Method Details

#createObject

Raises:



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/kazoo/topic.rb', line 70

def create
  raise Kazoo::Error, "The topic #{name} already exists!" if exists?
  validate

  result = cluster.zk.create(
    path: "/brokers/topics/#{name}",
    data: JSON.dump(version: 1, partitions: partitions_as_json)
  )

  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to create topic #{name}. Error code: #{result.fetch(:rc)}"
  end

  wait_for_partitions
end

#destroyObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/kazoo/topic.rb', line 86

def destroy
  t = Thread.current
  cb = Zookeeper::Callbacks::WatcherCallback.create do |event|
    case event.type
    when Zookeeper::Constants::ZOO_DELETED_EVENT
      t.run if t.status == 'sleep'
    else
      raise Kazoo::Error, "Unexpected Zookeeper event: #{event.type}"
    end
  end

  result = cluster.zk.stat(path: "/brokers/topics/#{name}", watcher: cb)
  case result.fetch(:rc)
   when Zookeeper::Constants::ZOK
    # continue
  when Zookeeper::Constants::ZNONODE
    raise Kazoo::Error, "Topic #{name} does not exist!"
  else
    raise Kazoo::Error, "Failed to monitor topic"
  end


  result = cluster.zk.create(path: "/admin/delete_topics/#{name}")
  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    Thread.stop unless cb.completed?
  when Zookeeper::Constants::ZNODEEXISTS
    raise Kazoo::Error, "The topic #{name} is already marked for deletion!"
  else
    raise Kazoo::Error, "Failed to delete topic #{name}. Error code: #{result.fetch(:rc)}"
  end
end

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


39
40
41
# File 'lib/kazoo/topic.rb', line 39

def eql?(other)
  other.kind_of?(Kazoo::Topic) && cluster == other.cluster && name == other.name
end

#exists?Boolean

Returns:

  • (Boolean)


49
50
51
52
# File 'lib/kazoo/topic.rb', line 49

def exists?
  stat = cluster.zk.stat(path: "/brokers/topics/#{name}")
  stat.fetch(:stat).exists?
end

#hashObject



45
46
47
# File 'lib/kazoo/topic.rb', line 45

def hash
  [cluster, name].hash
end

#inspectObject



35
36
37
# File 'lib/kazoo/topic.rb', line 35

def inspect
  "#<Kazoo::Topic #{name}>"
end

#partition(*args) ⇒ Object



23
24
25
# File 'lib/kazoo/topic.rb', line 23

def partition(*args)
  Kazoo::Partition.new(self, *args)
end

#replication_factorObject



27
28
29
# File 'lib/kazoo/topic.rb', line 27

def replication_factor
  partitions.map(&:replication_factor).min
end

#under_replicated?Boolean

Returns:

  • (Boolean)


31
32
33
# File 'lib/kazoo/topic.rb', line 31

def under_replicated?
  partitions.any?(:under_replicated?)
end

#valid?Boolean

Returns:

  • (Boolean)


64
65
66
67
68
# File 'lib/kazoo/topic.rb', line 64

def valid?
  validate
rescue Kazoo::ValidationError
  false
end

#validateObject



54
55
56
57
58
59
60
61
62
# File 'lib/kazoo/topic.rb', line 54

def validate
  raise Kazoo::ValidationError, "#{name} is not a valid topic name" if VALID_TOPIC_NAMES !~ name
  raise Kazoo::ValidationError, "#{name} is not a valid topic name" if BLACKLISTED_TOPIC_NAMES =~ name
  raise Kazoo::ValidationError, "#{name} is too long" if name.length > 255
  raise Kazoo::ValidationError, "The topic has no partitions defined" if partitions.length == 0
  partitions.each(&:validate)

  true
end