Class: Kafka::OffsetManager
- Inherits:
-
Object
- Object
- Kafka::OffsetManager
- Defined in:
- lib/kafka/offset_manager.rb
Instance Method Summary collapse
- #clear_offsets ⇒ Object
- #commit_offsets ⇒ Object
- #commit_offsets_if_necessary ⇒ Object
-
#initialize(cluster:, group:, logger:, commit_interval:, commit_threshold:) ⇒ OffsetManager
constructor
A new instance of OffsetManager.
- #mark_as_processed(topic, partition, offset) ⇒ Object
- #next_offset_for(topic, partition) ⇒ Object
- #set_default_offset(topic, default_offset) ⇒ Object
Constructor Details
#initialize(cluster:, group:, logger:, commit_interval:, commit_threshold:) ⇒ OffsetManager
Returns a new instance of OffsetManager.
3 4 5 6 7 8 9 10 11 12 13 14 15 |
# File 'lib/kafka/offset_manager.rb', line 3 def initialize(cluster:, group:, logger:, commit_interval:, commit_threshold:) @cluster = cluster @group = group @logger = logger @commit_interval = commit_interval @commit_threshold = commit_threshold @uncommitted_offsets = 0 @processed_offsets = {} @default_offsets = {} @committed_offsets = nil @last_commit = Time.at(0) end |
Instance Method Details
#clear_offsets ⇒ Object
65 66 67 68 69 |
# File 'lib/kafka/offset_manager.rb', line 65 def clear_offsets @uncommitted_offsets = 0 @processed_offsets.clear @committed_offsets = nil end |
#commit_offsets ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/kafka/offset_manager.rb', line 46 def commit_offsets unless @processed_offsets.empty? @logger.info "Committing offsets for #{@uncommitted_offsets} messages" @group.commit_offsets(@processed_offsets) @last_commit = Time.now @uncommitted_offsets = 0 @committed_offsets = nil end end |
#commit_offsets_if_necessary ⇒ Object
59 60 61 62 63 |
# File 'lib/kafka/offset_manager.rb', line 59 def commit_offsets_if_necessary if seconds_since_last_commit >= @commit_interval || commit_threshold_reached? commit_offsets end end |
#mark_as_processed(topic, partition, offset) ⇒ Object
21 22 23 24 25 |
# File 'lib/kafka/offset_manager.rb', line 21 def mark_as_processed(topic, partition, offset) @uncommitted_offsets += 1 @processed_offsets[topic] ||= {} @processed_offsets[topic][partition] = offset + 1 end |
#next_offset_for(topic, partition) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/kafka/offset_manager.rb', line 27 def next_offset_for(topic, partition) offset = @processed_offsets.fetch(topic, {}).fetch(partition) { committed_offset_for(topic, partition) } # A negative offset means that no offset has been committed, so we need to # resolve the default offset for the topic. if offset < 0 offset = @default_offsets.fetch(topic) offset = @cluster.resolve_offset(topic, partition, offset) # Make sure we commit this offset so that we don't repeat have to # resolve the default offset every time. mark_as_processed(topic, partition, offset - 1) end offset end |
#set_default_offset(topic, default_offset) ⇒ Object
17 18 19 |
# File 'lib/kafka/offset_manager.rb', line 17 def set_default_offset(topic, default_offset) @default_offsets[topic] = default_offset end |