Class: Ktl::RackAwareShufflePlan

Inherits:
RendezvousShufflePlan show all
Defined in:
lib/ktl/shuffle_plan.rb

Instance Method Summary collapse

Methods inherited from ShufflePlan

#generate, #generate_for_new_topic

Constructor Details

#initialize(*args) ⇒ RackAwareShufflePlan

Returns a new instance of RackAwareShufflePlan.



89
90
91
92
# File 'lib/ktl/shuffle_plan.rb', line 89

def initialize(*args)
  super
  @rack_mappings = {}
end

Instance Method Details

#assign_replicas_to_brokers(topic, brokers, partition_count, replica_count) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/ktl/shuffle_plan.rb', line 94

def assign_replicas_to_brokers(topic, brokers, partition_count, replica_count)
  if replica_count > brokers.size
    raise ArgumentError, sprintf('replication factor: %i larger than available brokers: %i', replica_count, brokers.size)
  end
  result = []
  racks = brokers.each_with_object({}) do |broker, acc|
    rack = rack_for(broker)
    acc[rack] ||= []
    acc[rack] << broker
  end
  partition_count.times do |partition|
    first_sorted = racks.flat_map do |rack, rack_brokers|
      hashed_brokers = rack_brokers.map do |broker|
        key = [partition, topic, broker].pack('l<a*l<')
        {id: broker, hash: Java::OrgJrubyUtil::MurmurHash.hash32(key.to_java_bytes, 0, key.bytesize, SEED)}
      end.sort_by do |broker|
        broker[:hash]
      end
      hashed_brokers.each_with_index do |broker, index|
        broker[:index] = index
      end
    end
    sorted = first_sorted.sort_by do |broker|
      [broker[:index], broker[:hash], broker[:id]]
    end
    selected = sorted.take(replica_count).map {|broker| broker[:id]}
    result.push(Scala::Tuple.new(partition, Scala::Collection::JavaConversions.as_scala_iterable(selected).to_list))
  end
  result
end