Class: Kafka::FetchedOffsetResolver

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/fetched_offset_resolver.rb

Instance Method Summary collapse

Constructor Details

#initialize(logger:) ⇒ FetchedOffsetResolver

Returns a new instance of FetchedOffsetResolver


5
6
7
# File 'lib/kafka/fetched_offset_resolver.rb', line 5

def initialize(logger:)
  @logger = TaggedLogger.new(logger)
end

Instance Method Details

#resolve!(broker, topics) ⇒ Object


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/kafka/fetched_offset_resolver.rb', line 9

def resolve!(broker, topics)
  pending_topics = filter_pending_topics(topics)
  return topics if pending_topics.empty?

  response = broker.list_offsets(topics: pending_topics)

  pending_topics.each do |topic, partitions|
    partitions.each do |options|
      partition = options.fetch(:partition)
      resolved_offset = response.offset_for(topic, partition)

      @logger.debug "Offset for #{topic}/#{partition} is #{resolved_offset.inspect}"

      topics[topic][partition][:fetch_offset] = resolved_offset || 0
    end
  end
end