Class: Kazoo::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/subscription.rb

Overview

A Kazoo::Subscription describes interest in a set of topics of a Kafka cluster.

Use Kazoo::Subscription.build to instantiate a subscription. It will return one of the two known subclasses of Kazoo::Subscription: a Kazoo::StaticSubscription for a static list of topics, or a Kazoo::PatternSUbscription for a dynamic list based on a regular expression that serves as a white list or black list.

Direct Known Subclasses

PatternSubscription, StaticSubscription

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#timestampObject (readonly)

Returns the value of attribute timestamp.



10
11
12
# File 'lib/kazoo/subscription.rb', line 10

def timestamp
  @timestamp
end

#versionObject (readonly)

Returns the value of attribute version.



10
11
12
# File 'lib/kazoo/subscription.rb', line 10

def version
  @version
end

Class Method Details

.build(subscription, pattern: :white_list, timestamp: Time.now) ⇒ Object

Instantiates a Kazoo::Subscription based on the subscription argument.

  • If the subscription argument is the name of a topic, a Kazoo::Topic, or an array of those, it will create a static subscription for the provided topic.

  • If the subscription argument is a regular expression, it will create a pattern subscription. The ‘pattern` argument will determine whether it is a white_list (default), or black_list.

  • If the subscription argument is a Kazoo::Subscription, it will return the argument itself.



24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/kazoo/subscription.rb', line 24

def self.build(subscription, pattern: :white_list, timestamp: Time.now)
  case subscription
  when Kazoo::Subscription
    subscription
  when String, Symbol, Kazoo::Topic, Array
    topic_names = Array(subscription).map { |t| topic_name(t) }
    Kazoo::StaticSubscription.new(topic_names, timestamp: timestamp)
  when Regexp
    Kazoo::PatternSubscription.new(subscription, pattern: pattern, timestamp: timestamp)
  else
    raise ArgumentError, "Don't know how to create a subscription from #{subscription.inspect}"
  end
end

.everythingObject

Instantiates a whitelist subscription that matches every topic.



13
14
15
# File 'lib/kazoo/subscription.rb', line 13

def self.everything
  build(/.*/)
end

.from_json(json_payload) ⇒ Object

Instantiates a Kazoo::Subscription based on a JSON payload as it is stored in Zookeeper.

This method will raise Kazoo::InvalidSubscription if the JSON payload cannot be parsed. Only version 1 payloads are supported.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/kazoo/subscription.rb', line 42

def self.from_json(json_payload)
  json = JSON.parse(json_payload)
  version, timestamp = json.fetch('version'), json.fetch('timestamp')
  raise Kazoo::InvalidSubscription, "Only version 1 subscriptions are supported, found version #{version}!" unless version == 1

  time = Time.at(BigDecimal.new(timestamp) / BigDecimal.new(1000))

  pattern, subscription = json.fetch('pattern'), json.fetch('subscription')
  raise Kazoo::InvalidSubscription, "Only subscriptions with a single stream are supported" unless subscription.values.all? { |streams| streams == 1 }

  case pattern
  when 'static'
    topic_names = subscription.keys
    Kazoo::StaticSubscription.new(topic_names, version: version, timestamp: time)

  when 'white_list', 'black_list'
    raise Kazoo::InvalidSubscription, "Only pattern subscriptions with a single expression are supported" unless subscription.keys.length == 1
    regexp = Regexp.new(subscription.keys.first.tr(',', '|'))
    Kazoo::PatternSubscription.new(regexp, pattern: pattern.to_sym, version: version, timestamp: time)

  else
    raise Kazoo::InvalidSubscription, "Unrecognized subscription pattern #{pattern.inspect}"
  end

rescue JSON::ParserError, KeyError => e
  raise Kazoo::InvalidSubscription.new(e.message)
end

.topic_name(topic) ⇒ Object

Returns a topic name based on various inputs. Helper method used by Kazoo::Subscription.build



73
74
75
76
77
78
79
# File 'lib/kazoo/subscription.rb', line 73

def self.topic_name(topic)
  case topic
    when String, Symbol; topic.to_s
    when Kazoo::Topic;   topic.name
    else raise ArgumentError, "Cannot get topic name from #{topic.inspect}"
  end
end

Instance Method Details

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


103
104
105
# File 'lib/kazoo/subscription.rb', line 103

def eql?(other)
  other.kind_of?(Kazoo::Subscription) && other.pattern == pattern && other.subscription == subscription
end

#has_topic?(topic) ⇒ Boolean

has_topic? should return true if a given Kazoo::Topic is part of this subscription.

Returns:

  • (Boolean)

Raises:

  • (NotImplementedError)


94
95
96
# File 'lib/kazoo/subscription.rb', line 94

def has_topic?(topic)
  raise NotImplementedError
end

#hashObject



109
110
111
# File 'lib/kazoo/subscription.rb', line 109

def hash
  [pattern, subscription].hash
end

#inspectObject



113
114
115
# File 'lib/kazoo/subscription.rb', line 113

def inspect
  "#<#{self.class.name} pattern=#{pattern} subscription=#{subscription.inspect}>"
end

#partitions(cluster) ⇒ Object

Returns an array of all Kazoo::Partition instances in the given Kafka cluster that are matched by this subscription.



89
90
91
# File 'lib/kazoo/subscription.rb', line 89

def partitions(cluster)
  topics(cluster).flat_map { |topic| topic.partitions }
end

#to_json(options = {}) ⇒ Object

Returns the JSON representation of this subscription that can be stored in Zookeeper.



99
100
101
# File 'lib/kazoo/subscription.rb', line 99

def to_json(options = {})
  JSON.dump(as_json(options))
end

#topics(cluster) ⇒ Object

Returns an array of all Kazoo::Topic instances in the given Kafka cluster that are matched by this subscription.



83
84
85
# File 'lib/kazoo/subscription.rb', line 83

def topics(cluster)
  cluster.topics.values.select { |topic| has_topic?(topic) }
end