Method: Kafka::ConsumerGroup#join

Defined in:
lib/kafka/consumer_group.rb

#joinObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/kafka/consumer_group.rb', line 47

def join
  if @topics.empty?
    raise Kafka::Error, "Cannot join group without at least one topic subscription"
  end

  join_group
  synchronize
rescue NotCoordinatorForGroup
  @logger.error "Failed to find coordinator for group `#{@group_id}`; retrying..."
  sleep 1
  @coordinator = nil
  retry
rescue ConnectionError
  @logger.error "Connection error while trying to join group `#{@group_id}`; retrying..."
  sleep 1
  @cluster.mark_as_stale!
  @coordinator = nil
  retry
end