Class: Kafka::OffsetManager
- Inherits:
-
Object
- Object
- Kafka::OffsetManager
- Defined in:
- lib/kafka/offset_manager.rb
Instance Method Summary collapse
- #clear_offsets ⇒ Object
- #clear_offsets_excluding(excluded) ⇒ Object
- #commit_offsets ⇒ Object
- #commit_offsets_if_necessary ⇒ Object
-
#initialize(cluster:, group:, logger:, commit_interval:, commit_threshold:) ⇒ OffsetManager
constructor
A new instance of OffsetManager.
- #mark_as_processed(topic, partition, offset) ⇒ Object
- #next_offset_for(topic, partition) ⇒ Object
- #seek_to_default(topic, partition) ⇒ Object
- #set_default_offset(topic, default_offset) ⇒ Object
Constructor Details
#initialize(cluster:, group:, logger:, commit_interval:, commit_threshold:) ⇒ OffsetManager
Returns a new instance of OffsetManager.
3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# File 'lib/kafka/offset_manager.rb', line 3 def initialize(cluster:, group:, logger:, commit_interval:, commit_threshold:) @cluster = cluster @group = group @logger = logger @commit_interval = commit_interval @commit_threshold = commit_threshold @uncommitted_offsets = 0 @processed_offsets = {} @default_offsets = {} @committed_offsets = nil @resolved_offsets = {} @last_commit = Time.now end |
Instance Method Details
#clear_offsets ⇒ Object
72 73 74 75 76 77 |
# File 'lib/kafka/offset_manager.rb', line 72 def clear_offsets @processed_offsets.clear # Clear the cached commits from the brokers. @committed_offsets = nil end |
#clear_offsets_excluding(excluded) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/kafka/offset_manager.rb', line 79 def clear_offsets_excluding(excluded) # Clear all offsets that aren't in `excluded`. @processed_offsets.each do |topic, partitions| partitions.keep_if do |partition, _| excluded.fetch(topic, []).include?(partition) end end # Clear the cached commits from the brokers. @committed_offsets = nil end |
#commit_offsets ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/kafka/offset_manager.rb', line 49 def commit_offsets unless @processed_offsets.empty? pretty_offsets = @processed_offsets.flat_map {|topic, partitions| partitions.map {|partition, offset| "#{topic}/#{partition}:#{offset}" } }.join(", ") @logger.info "Committing offsets: #{pretty_offsets}" @group.commit_offsets(@processed_offsets) @last_commit = Time.now @uncommitted_offsets = 0 @committed_offsets = nil end end |
#commit_offsets_if_necessary ⇒ Object
66 67 68 69 70 |
# File 'lib/kafka/offset_manager.rb', line 66 def commit_offsets_if_necessary if commit_timeout_reached? || commit_threshold_reached? commit_offsets end end |
#mark_as_processed(topic, partition, offset) ⇒ Object
22 23 24 25 26 27 |
# File 'lib/kafka/offset_manager.rb', line 22 def mark_as_processed(topic, partition, offset) @uncommitted_offsets += 1 @processed_offsets[topic] ||= {} @processed_offsets[topic][partition] = offset @logger.debug "Marking #{topic}/#{partition}:#{offset} as committed" end |
#next_offset_for(topic, partition) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/kafka/offset_manager.rb', line 34 def next_offset_for(topic, partition) offset = @processed_offsets.fetch(topic, {}).fetch(partition) { committed_offset_for(topic, partition) } # A negative offset means that no offset has been committed, so we need to # resolve the default offset for the topic. if offset < 0 resolve_offset(topic, partition) else # The next offset is the last offset plus one. offset + 1 end end |
#seek_to_default(topic, partition) ⇒ Object
29 30 31 32 |
# File 'lib/kafka/offset_manager.rb', line 29 def seek_to_default(topic, partition) @processed_offsets[topic] ||= {} @processed_offsets[topic][partition] = -1 end |
#set_default_offset(topic, default_offset) ⇒ Object
18 19 20 |
# File 'lib/kafka/offset_manager.rb', line 18 def set_default_offset(topic, default_offset) @default_offsets[topic] = default_offset end |