Module: Kafka::Admin

Defined in:
lib/ext/kafka.rb

Constant Summary collapse

TopicCommandOptions =
TopicCommand::TopicCommandOptions

Class Method Summary collapse

Class Method Details

.assign_replicas_to_brokers(brokers, partitions, repl_factor, index = -1,, partition = -1)) ⇒ Object



112
113
114
115
# File 'lib/ext/kafka.rb', line 112

def self.assign_replicas_to_brokers(brokers, partitions, repl_factor, index=-1, partition=-1)
  assignment = AdminUtils.assign_replicas_to_brokers(brokers, partitions.to_java(:int), repl_factor.to_java(:int), index.to_java(:int), partition.to_java(:int))
  ScalaEnumerable.new(assignment)
end

.get_broker_metadatas(zk_client, brokers, force_rack = true) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/ext/kafka.rb', line 117

def self.get_broker_metadatas(zk_client, brokers, force_rack = true)
  rack_aware = if force_rack
    JRuby.runtime.jruby_class_loader.load_class('kafka.admin.RackAwareMode$Enforced$').get_declared_field('MODULE$').get(nil)
  else
    JRuby.runtime.jruby_class_loader.load_class('kafka.admin.RackAwareMode$Safe$').get_declared_field('MODULE$').get(nil)
  end
  broker_metadatas = Kafka::Admin::AdminUtils.get_broker_metadatas(
    zk_client.utils, 
    rack_aware,
    Scala::Option[Scala::Collection::JavaConversions.as_scala_iterable(brokers).to_list]
  )
  Scala::Collection::JavaConversions.seq_as_java_list(broker_metadatas).to_a
end

.preferred_replica(zk_client, topics_partitions) ⇒ Object



108
109
110
# File 'lib/ext/kafka.rb', line 108

def self.preferred_replica(zk_client, topics_partitions)
  PreferredReplicaLeaderElectionCommand.write_preferred_replica_election_data(zk_client, topics_partitions)
end

.to_topic_options(hash) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/ext/kafka.rb', line 94

def self.to_topic_options(hash)
  options = hash.flat_map do |key, value|
    kafka_key = '--' + key.to_s.gsub('_', '-')
    if value.is_a?(Hash)
      value.map { |k, v| [kafka_key, [k, v].join('=')] }
    elsif value.is_a?(Array)
      value.map { |v| [kafka_key, v] }
    else
      [kafka_key, value].compact
    end
  end
  TopicCommandOptions.new(options.flatten)
end