Class: Ktl::RendezvousShufflePlan

Inherits:
ShufflePlan show all
Defined in:
lib/ktl/shuffle_plan.rb

Direct Known Subclasses

RackAwareShufflePlan

Instance Method Summary collapse

Methods inherited from ShufflePlan

#generate, #generate_for_new_topic, #initialize

Constructor Details

This class inherits a constructor from Ktl::ShufflePlan

Instance Method Details

#assign_replicas_to_brokers(topic, brokers, partition_count, replica_count) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/ktl/shuffle_plan.rb', line 67

def assign_replicas_to_brokers(topic, brokers, partition_count, replica_count)
  if replica_count > brokers.size
    raise ArgumentError, sprintf('replication factor: %i larger than available brokers: %i', replica_count, brokers.size)
  end
  result = []
  partition_count.times do |partition|
    sorted = brokers.sort_by do |broker|
      key = [partition, topic, broker].pack('l<a*l<')
      Java::OrgJrubyUtil::MurmurHash.hash32(key.to_java_bytes, 0, key.bytesize, SEED)
    end
    selected = sorted.take(replica_count)
    result.push(Scala::Tuple.new(partition, Scala::Collection::JavaConversions.as_scala_iterable(selected).to_list))
  end
  result
end