46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
# File 'lib/kafka/fetch_operation.rb', line 46
def execute
@cluster.add_target_topics(@topics.keys)
@cluster.refresh_metadata_if_necessary!
topics_by_broker = {}
if @topics.none? {|topic, partitions| partitions.any? }
raise NoPartitionsToFetchFrom
end
@topics.each do |topic, partitions|
partitions.each do |partition, options|
broker = @cluster.get_leader(topic, partition)
topics_by_broker[broker] ||= {}
topics_by_broker[broker][topic] ||= {}
topics_by_broker[broker][topic][partition] = options
end
end
topics_by_broker.flat_map {|broker, topics|
resolve_offsets(broker, topics)
options = {
max_wait_time: @max_wait_time * 1000,
min_bytes: @min_bytes,
max_bytes: @max_bytes,
topics: topics,
}
response = broker.fetch_messages(**options)
response.topics.flat_map {|fetched_topic|
fetched_topic.partitions.map {|fetched_partition|
begin
Protocol.handle_error(fetched_partition.error_code)
rescue Kafka::OffsetOutOfRange => e
e.topic = fetched_topic.name
e.partition = fetched_partition.partition
e.offset = topics.fetch(e.topic).fetch(e.partition).fetch(:fetch_offset)
raise e
rescue Kafka::Error => e
topic = fetched_topic.name
partition = fetched_partition.partition
@logger.error "Failed to fetch from #{topic}/#{partition}: #{e.message}"
raise e
end
messages = fetched_partition.messages.map {|message|
FetchedMessage.new(
message: message,
topic: fetched_topic.name,
partition: fetched_partition.partition,
)
}
FetchedBatch.new(
topic: fetched_topic.name,
partition: fetched_partition.partition,
highwater_mark_offset: fetched_partition.highwater_mark_offset,
messages: messages,
)
}
}
}
rescue Kafka::ConnectionError, Kafka::LeaderNotAvailable, Kafka::NotLeaderForPartition
@cluster.mark_as_stale!
raise
end
|