Class: Kafka::OffsetManager
- Inherits:
-
Object
- Object
- Kafka::OffsetManager
- Defined in:
- lib/kafka/offset_manager.rb
Instance Method Summary collapse
- #clear_offsets ⇒ Object
- #commit_offsets ⇒ Object
- #commit_offsets_if_necessary ⇒ Object
-
#initialize(group:, logger:, commit_interval:, commit_threshold:) ⇒ OffsetManager
constructor
A new instance of OffsetManager.
- #mark_as_processed(topic, partition, offset) ⇒ Object
- #next_offset_for(topic, partition) ⇒ Object
- #set_default_offset(topic, default_offset) ⇒ Object
Constructor Details
#initialize(group:, logger:, commit_interval:, commit_threshold:) ⇒ OffsetManager
Returns a new instance of OffsetManager.
3 4 5 6 7 8 9 10 11 12 13 14 |
# File 'lib/kafka/offset_manager.rb', line 3 def initialize(group:, logger:, commit_interval:, commit_threshold:) @group = group @logger = logger @commit_interval = commit_interval @commit_threshold = commit_threshold @uncommitted_offsets = 0 @processed_offsets = {} @default_offsets = {} @committed_offsets = nil @last_commit = Time.at(0) end |
Instance Method Details
#clear_offsets ⇒ Object
54 55 56 57 58 |
# File 'lib/kafka/offset_manager.rb', line 54 def clear_offsets @uncommitted_offsets = 0 @processed_offsets.clear @committed_offsets = nil end |
#commit_offsets ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/kafka/offset_manager.rb', line 36 def commit_offsets unless @processed_offsets.empty? @logger.info "Committing offsets for #{@uncommitted_offsets} messages" @group.commit_offsets(@processed_offsets) @last_commit = Time.now @processed_offsets.clear @uncommitted_offsets = 0 end end |
#commit_offsets_if_necessary ⇒ Object
48 49 50 51 52 |
# File 'lib/kafka/offset_manager.rb', line 48 def commit_offsets_if_necessary if seconds_since_last_commit >= @commit_interval || commit_threshold_reached? commit_offsets end end |
#mark_as_processed(topic, partition, offset) ⇒ Object
20 21 22 23 24 |
# File 'lib/kafka/offset_manager.rb', line 20 def mark_as_processed(topic, partition, offset) @uncommitted_offsets += 1 @processed_offsets[topic] ||= {} @processed_offsets[topic][partition] = offset + 1 end |
#next_offset_for(topic, partition) ⇒ Object
26 27 28 29 30 31 32 33 34 |
# File 'lib/kafka/offset_manager.rb', line 26 def next_offset_for(topic, partition) offset = @processed_offsets.fetch(topic, {}).fetch(partition) { committed_offset_for(topic, partition) } offset = @default_offsets.fetch(topic) if offset < 0 offset end |
#set_default_offset(topic, default_offset) ⇒ Object
16 17 18 |
# File 'lib/kafka/offset_manager.rb', line 16 def set_default_offset(topic, default_offset) @default_offsets[topic] = default_offset end |