Class: Ktl::Reassigner

Inherits:
Object
  • Object
show all
Defined in:
lib/ktl/reassigner.rb

Instance Method Summary collapse

Constructor Details

#initialize(zk_client, options = {}) ⇒ Reassigner

Returns a new instance of Reassigner.



5
6
7
8
9
10
11
12
# File 'lib/ktl/reassigner.rb', line 5

def initialize(zk_client, options={})
  @zk_client = zk_client
  @limit = options[:limit]
  @overflow_path = '/ktl/overflow'
  @state_path = '/ktl/reassign'
  @logger = options[:logger] || NullLogger.new
  @log_assignments = !!options[:log_assignments]
end

Instance Method Details

#execute(reassignment) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/ktl/reassigner.rb', line 38

def execute(reassignment)
  reassignments = split(reassignment, @limit)
  actual_reassignment = reassignments.shift
  if @log_assignments
    Scala::Collection::JavaConversions.as_java_iterable(actual_reassignment).each do |pr|
      topic_and_partition, replicas = pr.elements
      brokers = Scala::Collection::JavaConversions.as_java_iterable(replicas).to_a
      @logger.info "Assigning #{topic_and_partition.topic},#{topic_and_partition.partition} to #{brokers.join(',')}"
    end
  end
  json = reassignment_json(actual_reassignment)
  @zk_client.reassign_partitions(json)
  manage_overflow(reassignments)
  manage_progress_state(actual_reassignment)
end

#load_overflowObject



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/ktl/reassigner.rb', line 26

def load_overflow
  overflow = Scala::Collection::Map.empty
  overflow_nodes = @zk_client.get_children(@overflow_path)
  overflow_nodes.foreach do |index|
    overflow_json = @zk_client.read_data(overflow_path(index)).first
    data = parse_reassignment_json(overflow_json)
    overflow = overflow.send('++', data)
  end
  delete_previous_overflow
  overflow
end

#overflow?Boolean

Returns:

  • (Boolean)


19
20
21
22
23
24
# File 'lib/ktl/reassigner.rb', line 19

def overflow?
  overflow_znodes = @zk_client.get_children(@overflow_path)
  overflow_znodes.size > 0
rescue ZkClient::Exception::ZkNoNodeException
  false
end

#reassignment_in_progress?Boolean

Returns:

  • (Boolean)


14
15
16
17
# File 'lib/ktl/reassigner.rb', line 14

def reassignment_in_progress?
  partitions = @zk_client.partitions_being_reassigned
  partitions.size > 0
end