51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/kafka/fetch_operation.rb', line 51
def execute
@cluster.add_target_topics(@topics.keys)
@cluster.refresh_metadata_if_necessary!
topics_by_broker = {}
if @topics.none? {|topic, partitions| partitions.any? }
raise NoPartitionsToFetchFrom
end
@topics.each do |topic, partitions|
partitions.each do |partition, options|
broker = @cluster.get_leader(topic, partition)
topics_by_broker[broker] ||= {}
topics_by_broker[broker][topic] ||= {}
topics_by_broker[broker][topic][partition] = options
end
end
topics_by_broker.flat_map do |broker, topics|
@offset_resolver.resolve!(broker, topics)
options = {
max_wait_time: @max_wait_time * 1000, min_bytes: @min_bytes,
max_bytes: @max_bytes,
topics: topics,
}
response = broker.fetch_messages(**options)
response.topics.flat_map do |fetched_topic|
fetched_topic.partitions.map do |fetched_partition|
begin
Protocol.handle_error(fetched_partition.error_code)
rescue Kafka::OffsetOutOfRange => e
e.topic = fetched_topic.name
e.partition = fetched_partition.partition
e.offset = topics.fetch(e.topic).fetch(e.partition).fetch(:fetch_offset)
raise e
rescue Kafka::Error => e
topic = fetched_topic.name
partition = fetched_partition.partition
@logger.error "Failed to fetch from #{topic}/#{partition}: #{e.message}"
raise e
end
Kafka::FetchedBatchGenerator.new(
fetched_topic.name,
fetched_partition,
topics.fetch(fetched_topic.name).fetch(fetched_partition.partition).fetch(:fetch_offset),
logger: @logger
).generate
end
end
end
rescue Kafka::ConnectionError, Kafka::LeaderNotAvailable, Kafka::NotLeaderForPartition
@cluster.mark_as_stale!
raise
end
|