Class: Ktl::Reassigner
- Inherits:
-
Object
- Object
- Ktl::Reassigner
- Defined in:
- lib/ktl/reassigner.rb
Instance Method Summary collapse
- #execute(reassignment) ⇒ Object
-
#initialize(zk_client, options = {}) ⇒ Reassigner
constructor
A new instance of Reassigner.
- #load_overflow ⇒ Object
- #overflow? ⇒ Boolean
- #reassignment_in_progress? ⇒ Boolean
Constructor Details
#initialize(zk_client, options = {}) ⇒ Reassigner
Returns a new instance of Reassigner.
5 6 7 8 9 10 11 12 |
# File 'lib/ktl/reassigner.rb', line 5 def initialize(zk_client, ={}) @zk_client = zk_client @limit = [:limit] @overflow_path = '/ktl/overflow' @state_path = '/ktl/reassign' @logger = [:logger] || NullLogger.new @log_assignments = !![:log_assignments] end |
Instance Method Details
#execute(reassignment) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/ktl/reassigner.rb', line 38 def execute(reassignment) reassignments = split(reassignment, @limit) actual_reassignment = reassignments.shift if @log_assignments Scala::Collection::JavaConversions.as_java_iterable(actual_reassignment).each do |pr| topic_and_partition, replicas = pr.elements brokers = Scala::Collection::JavaConversions.as_java_iterable(replicas).to_a @logger.info "Assigning #{topic_and_partition.topic},#{topic_and_partition.partition} to #{brokers.join(',')}" end end json = reassignment_json(actual_reassignment) @zk_client.reassign_partitions(json) manage_overflow(reassignments) manage_progress_state(actual_reassignment) end |
#load_overflow ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/ktl/reassigner.rb', line 26 def load_overflow overflow = Scala::Collection::Map.empty overflow_nodes = @zk_client.get_children(@overflow_path) overflow_nodes.foreach do |index| overflow_json = @zk_client.read_data(overflow_path(index)).first data = parse_reassignment_json(overflow_json) overflow = overflow.send('++', data) end delete_previous_overflow overflow end |
#overflow? ⇒ Boolean
19 20 21 22 23 24 |
# File 'lib/ktl/reassigner.rb', line 19 def overflow? overflow_znodes = @zk_client.get_children(@overflow_path) overflow_znodes.size > 0 rescue ZkClient::Exception::ZkNoNodeException false end |
#reassignment_in_progress? ⇒ Boolean
14 15 16 17 |
# File 'lib/ktl/reassigner.rb', line 14 def reassignment_in_progress? partitions = @zk_client.partitions_being_reassigned partitions.size > 0 end |