Class: Kazoo::Subscription
- Inherits:
-
Object
- Object
- Kazoo::Subscription
- Defined in:
- lib/kazoo/subscription.rb
Overview
A Kazoo::Subscription describes interest in a set of topics of a Kafka cluster.
Use Kazoo::Subscription.build to instantiate a subscription. It will return one of the two known subclasses of Kazoo::Subscription: a Kazoo::StaticSubscription for a static list of topics, or a Kazoo::PatternSUbscription for a dynamic list based on a regular expression that serves as a white list or black list.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#timestamp ⇒ Object
readonly
Returns the value of attribute timestamp.
-
#version ⇒ Object
readonly
Returns the value of attribute version.
Class Method Summary collapse
-
.build(subscription, pattern: :white_list, timestamp: Time.now) ⇒ Object
Instantiates a Kazoo::Subscription based on the subscription argument.
-
.everything ⇒ Object
Instantiates a whitelist subscription that matches every topic.
-
.from_json(json_payload) ⇒ Object
Instantiates a Kazoo::Subscription based on a JSON payload as it is stored in Zookeeper.
-
.topic_name(topic) ⇒ Object
Returns a topic name based on various inputs.
Instance Method Summary collapse
- #eql?(other) ⇒ Boolean (also: #==)
-
#has_topic?(topic) ⇒ Boolean
has_topic? should return true if a given Kazoo::Topic is part of this subscription.
- #hash ⇒ Object
- #inspect ⇒ Object
-
#partitions(cluster) ⇒ Object
Returns an array of all Kazoo::Partition instances in the given Kafka cluster that are matched by this subscription.
-
#to_json(options = {}) ⇒ Object
Returns the JSON representation of this subscription that can be stored in Zookeeper.
-
#topics(cluster) ⇒ Object
Returns an array of all Kazoo::Topic instances in the given Kafka cluster that are matched by this subscription.
Instance Attribute Details
#timestamp ⇒ Object (readonly)
Returns the value of attribute timestamp.
10 11 12 |
# File 'lib/kazoo/subscription.rb', line 10 def @timestamp end |
#version ⇒ Object (readonly)
Returns the value of attribute version.
10 11 12 |
# File 'lib/kazoo/subscription.rb', line 10 def version @version end |
Class Method Details
.build(subscription, pattern: :white_list, timestamp: Time.now) ⇒ Object
Instantiates a Kazoo::Subscription based on the subscription argument.
-
If the subscription argument is the name of a topic, a Kazoo::Topic, or an array of those, it will create a static subscription for the provided topic.
-
If the subscription argument is a regular expression, it will create a pattern subscription. The ‘pattern` argument will determine whether it is a white_list (default), or black_list.
-
If the subscription argument is a Kazoo::Subscription, it will return the argument itself.
24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/kazoo/subscription.rb', line 24 def self.build(subscription, pattern: :white_list, timestamp: Time.now) case subscription when Kazoo::Subscription subscription when String, Symbol, Kazoo::Topic, Array topic_names = Array(subscription).map { |t| topic_name(t) } Kazoo::StaticSubscription.new(topic_names, timestamp: ) when Regexp Kazoo::PatternSubscription.new(subscription, pattern: pattern, timestamp: ) else raise ArgumentError, "Don't know how to create a subscription from #{subscription.inspect}" end end |
.everything ⇒ Object
Instantiates a whitelist subscription that matches every topic.
13 14 15 |
# File 'lib/kazoo/subscription.rb', line 13 def self.everything build(/.*/) end |
.from_json(json_payload) ⇒ Object
Instantiates a Kazoo::Subscription based on a JSON payload as it is stored in Zookeeper.
This method will raise Kazoo::InvalidSubscription if the JSON payload cannot be parsed. Only version 1 payloads are supported.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/kazoo/subscription.rb', line 42 def self.from_json(json_payload) json = JSON.parse(json_payload) version, = json.fetch('version'), json.fetch('timestamp') raise Kazoo::InvalidSubscription, "Only version 1 subscriptions are supported, found version #{version}!" unless version == 1 time = Time.at(BigDecimal.new() / BigDecimal.new(1000)) pattern, subscription = json.fetch('pattern'), json.fetch('subscription') raise Kazoo::InvalidSubscription, "Only subscriptions with a single stream are supported" unless subscription.values.all? { |streams| streams == 1 } case pattern when 'static' topic_names = subscription.keys Kazoo::StaticSubscription.new(topic_names, version: version, timestamp: time) when 'white_list', 'black_list' raise Kazoo::InvalidSubscription, "Only pattern subscriptions with a single expression are supported" unless subscription.keys.length == 1 regexp = Regexp.new(subscription.keys.first.tr(',', '|')) Kazoo::PatternSubscription.new(regexp, pattern: pattern.to_sym, version: version, timestamp: time) else raise Kazoo::InvalidSubscription, "Unrecognized subscription pattern #{pattern.inspect}" end rescue JSON::ParserError, KeyError => e raise Kazoo::InvalidSubscription.new(e.) end |
.topic_name(topic) ⇒ Object
Returns a topic name based on various inputs. Helper method used by Kazoo::Subscription.build
73 74 75 76 77 78 79 |
# File 'lib/kazoo/subscription.rb', line 73 def self.topic_name(topic) case topic when String, Symbol; topic.to_s when Kazoo::Topic; topic.name else raise ArgumentError, "Cannot get topic name from #{topic.inspect}" end end |
Instance Method Details
#eql?(other) ⇒ Boolean Also known as: ==
103 104 105 |
# File 'lib/kazoo/subscription.rb', line 103 def eql?(other) other.kind_of?(Kazoo::Subscription) && other.pattern == pattern && other.subscription == subscription end |
#has_topic?(topic) ⇒ Boolean
has_topic? should return true if a given Kazoo::Topic is part of this subscription.
94 95 96 |
# File 'lib/kazoo/subscription.rb', line 94 def has_topic?(topic) raise NotImplementedError end |
#hash ⇒ Object
109 110 111 |
# File 'lib/kazoo/subscription.rb', line 109 def hash [pattern, subscription].hash end |
#inspect ⇒ Object
113 114 115 |
# File 'lib/kazoo/subscription.rb', line 113 def inspect "#<#{self.class.name} pattern=#{pattern} subscription=#{subscription.inspect}>" end |
#partitions(cluster) ⇒ Object
Returns an array of all Kazoo::Partition instances in the given Kafka cluster that are matched by this subscription.
89 90 91 |
# File 'lib/kazoo/subscription.rb', line 89 def partitions(cluster) topics(cluster).flat_map { |topic| topic.partitions } end |
#to_json(options = {}) ⇒ Object
Returns the JSON representation of this subscription that can be stored in Zookeeper.
99 100 101 |
# File 'lib/kazoo/subscription.rb', line 99 def to_json( = {}) JSON.dump(as_json()) end |
#topics(cluster) ⇒ Object
Returns an array of all Kazoo::Topic instances in the given Kafka cluster that are matched by this subscription.
83 84 85 |
# File 'lib/kazoo/subscription.rb', line 83 def topics(cluster) cluster.topics.values.select { |topic| has_topic?(topic) } end |