Class: Kafka::OffsetManager
- Inherits:
-
Object
- Object
- Kafka::OffsetManager
- Defined in:
- lib/kafka/offset_manager.rb
Constant Summary collapse
- DEFAULT_RETENTION_TIME =
The default broker setting for offsets.retention.minutes is 1440.
1440 * 60
Instance Method Summary collapse
- #clear_offsets ⇒ Object
- #clear_offsets_excluding(excluded) ⇒ Object
- #commit_offsets(recommit = false) ⇒ Object
- #commit_offsets_if_necessary ⇒ Object
-
#initialize(cluster:, group:, logger:, commit_interval:, commit_threshold:, offset_retention_time:) ⇒ OffsetManager
constructor
A new instance of OffsetManager.
- #mark_as_processed(topic, partition, offset) ⇒ Object
- #next_offset_for(topic, partition) ⇒ Object
- #seek_to_default(topic, partition) ⇒ Object
- #set_default_offset(topic, default_offset) ⇒ Object
Constructor Details
#initialize(cluster:, group:, logger:, commit_interval:, commit_threshold:, offset_retention_time:) ⇒ OffsetManager
Returns a new instance of OffsetManager.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/kafka/offset_manager.rb', line 7 def initialize(cluster:, group:, logger:, commit_interval:, commit_threshold:, offset_retention_time:) @cluster = cluster @group = group @logger = logger @commit_interval = commit_interval @commit_threshold = commit_threshold @uncommitted_offsets = 0 @processed_offsets = {} @default_offsets = {} @committed_offsets = nil @resolved_offsets = {} @last_commit = Time.now @last_recommit = nil @recommit_interval = (offset_retention_time || DEFAULT_RETENTION_TIME) / 2 end |
Instance Method Details
#clear_offsets ⇒ Object
80 81 82 83 84 85 86 |
# File 'lib/kafka/offset_manager.rb', line 80 def clear_offsets @processed_offsets.clear @resolved_offsets.clear # Clear the cached commits from the brokers. @committed_offsets = nil end |
#clear_offsets_excluding(excluded) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/kafka/offset_manager.rb', line 88 def clear_offsets_excluding(excluded) # Clear all offsets that aren't in `excluded`. @processed_offsets.each do |topic, partitions| partitions.keep_if do |partition, _| excluded.fetch(topic, []).include?(partition) end end # Clear the cached commits from the brokers. @committed_offsets = nil @resolved_offsets.clear end |
#commit_offsets(recommit = false) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/kafka/offset_manager.rb', line 58 def commit_offsets(recommit = false) offsets = offsets_to_commit(recommit) unless offsets.empty? @logger.info "Committing offsets#{recommit ? ' with recommit' : ''}: #{prettify_offsets(offsets)}" @group.commit_offsets(offsets) @last_commit = Time.now @last_recommit = Time.now if recommit @uncommitted_offsets = 0 @committed_offsets = nil end end |
#commit_offsets_if_necessary ⇒ Object
73 74 75 76 77 78 |
# File 'lib/kafka/offset_manager.rb', line 73 def commit_offsets_if_necessary recommit = recommit_timeout_reached? if recommit || commit_timeout_reached? || commit_threshold_reached? commit_offsets(recommit) end end |
#mark_as_processed(topic, partition, offset) ⇒ Object
28 29 30 31 32 33 34 35 36 |
# File 'lib/kafka/offset_manager.rb', line 28 def mark_as_processed(topic, partition, offset) @uncommitted_offsets += 1 @processed_offsets[topic] ||= {} # The committed offset should always be the offset of the next message that the # application will read, thus adding one to the last message processed @processed_offsets[topic][partition] = offset + 1 @logger.debug "Marking #{topic}/#{partition}:#{offset} as committed" end |
#next_offset_for(topic, partition) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/kafka/offset_manager.rb', line 43 def next_offset_for(topic, partition) offset = @processed_offsets.fetch(topic, {}).fetch(partition) { committed_offset_for(topic, partition) } # A negative offset means that no offset has been committed, so we need to # resolve the default offset for the topic. if offset < 0 resolve_offset(topic, partition) else # The next offset is the last offset. offset end end |
#seek_to_default(topic, partition) ⇒ Object
38 39 40 41 |
# File 'lib/kafka/offset_manager.rb', line 38 def seek_to_default(topic, partition) @processed_offsets[topic] ||= {} @processed_offsets[topic][partition] = -1 end |
#set_default_offset(topic, default_offset) ⇒ Object
24 25 26 |
# File 'lib/kafka/offset_manager.rb', line 24 def set_default_offset(topic, default_offset) @default_offsets[topic] = default_offset end |