Module: Kafka::Admin
- Defined in:
- lib/ext/kafka.rb
Constant Summary collapse
- TopicCommandOptions =
TopicCommand::TopicCommandOptions
Class Method Summary collapse
- .assign_replicas_to_brokers(brokers, partitions, repl_factor, index = -1,, partition = -1)) ⇒ Object
- .get_broker_metadatas(zk_client, brokers, force_rack = true) ⇒ Object
- .preferred_replica(zk_client, topics_partitions) ⇒ Object
- .to_topic_options(hash) ⇒ Object
Class Method Details
.assign_replicas_to_brokers(brokers, partitions, repl_factor, index = -1,, partition = -1)) ⇒ Object
112 113 114 115 |
# File 'lib/ext/kafka.rb', line 112 def self.assign_replicas_to_brokers(brokers, partitions, repl_factor, index=-1, partition=-1) assignment = AdminUtils.assign_replicas_to_brokers(brokers, partitions.to_java(:int), repl_factor.to_java(:int), index.to_java(:int), partition.to_java(:int)) ScalaEnumerable.new(assignment) end |
.get_broker_metadatas(zk_client, brokers, force_rack = true) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/ext/kafka.rb', line 117 def self.(zk_client, brokers, force_rack = true) rack_aware = if force_rack JRuby.runtime.jruby_class_loader.load_class('kafka.admin.RackAwareMode$Enforced$').get_declared_field('MODULE$').get(nil) else JRuby.runtime.jruby_class_loader.load_class('kafka.admin.RackAwareMode$Safe$').get_declared_field('MODULE$').get(nil) end = Kafka::Admin::AdminUtils.( zk_client.utils, rack_aware, Scala::Option[Scala::Collection::JavaConversions.as_scala_iterable(brokers).to_list] ) Scala::Collection::JavaConversions.seq_as_java_list().to_a end |
.preferred_replica(zk_client, topics_partitions) ⇒ Object
108 109 110 |
# File 'lib/ext/kafka.rb', line 108 def self.preferred_replica(zk_client, topics_partitions) PreferredReplicaLeaderElectionCommand.write_preferred_replica_election_data(zk_client, topics_partitions) end |
.to_topic_options(hash) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/ext/kafka.rb', line 94 def self.(hash) = hash.flat_map do |key, value| kafka_key = '--' + key.to_s.gsub('_', '-') if value.is_a?(Hash) value.map { |k, v| [kafka_key, [k, v].join('=')] } elsif value.is_a?(Array) value.map { |v| [kafka_key, v] } else [kafka_key, value].compact end end TopicCommandOptions.new(.flatten) end |