Class: Karafka::Pro::Processing::Coordinators::VirtualOffsetManager
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::Coordinators::VirtualOffsetManager
- Defined in:
- lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb
Overview
We still use the regular coordinator “real” offset management as we want to have them as separated as possible because the real seek offset management is also used for pausing, filtering and others and should not be impacted by the virtual one
This manager is not thread-safe by itself. It should operate from coordinator locked locations.
Manager that keeps track of our offsets with the virtualization layer that are local to given partition assignment. It allows for easier offset management for virtual virtual partition cases as it provides us ability to mark as consumed and move the real offset behind as expected.
Instance Attribute Summary collapse
-
#groups ⇒ Object
readonly
Returns the value of attribute groups.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears the manager for a next collective operation.
-
#initialize(topic, partition, offset_metadata_strategy) ⇒ VirtualOffsetManager
constructor
A new instance of VirtualOffsetManager.
-
#mark(message, offset_metadata) ⇒ Object
Marks given message as marked (virtually consumed).
-
#mark_until(message, offset_metadata) ⇒ Object
Mark all from all groups including the
message. -
#markable ⇒ Array<Messages::Seek, String>
Markable message for real offset marking and its associated metadata.
-
#markable? ⇒ Boolean
Is there a real offset we can mark as consumed.
-
#marked ⇒ Array<Integer>
Offsets of messages already marked as consumed virtually.
-
#register(offsets_group) ⇒ Object
Registers an offset group coming from one virtual consumer.
Constructor Details
#initialize(topic, partition, offset_metadata_strategy) ⇒ VirtualOffsetManager
We need topic and partition because we use a seek message (virtual) for real offset management. We could keep real message reference but this can be memory consuming and not worth it.
Returns a new instance of VirtualOffsetManager.
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 50 def initialize(topic, partition, ) @topic = topic @partition = partition @groups = [] @marked = {} = {} @real_offset = -1 = = nil end |
Instance Attribute Details
#groups ⇒ Object (readonly)
Returns the value of attribute groups.
39 40 41 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 39 def groups @groups end |
Instance Method Details
#clear ⇒ Object
Clears the manager for a next collective operation
62 63 64 65 66 67 68 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 62 def clear @groups.clear .clear = nil @marked.clear @real_offset = -1 end |
#mark(message, offset_metadata) ⇒ Object
Marks given message as marked (virtually consumed). We mark given message offset and other earlier offsets from the same group as done and we can refresh our real offset representation based on that as it might have changed to a newer real offset.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 87 def mark(, ) offset = .offset # Store metadata when we materialize the most stable offset [offset] = = group = @groups.find { |reg_group| reg_group.include?(offset) } # This case can happen when someone uses MoM and wants to mark message from a previous # batch as consumed. We can add it, since the real offset refresh will point to it unless group group = [offset] @groups << group end position = group.index(offset) # Mark all previous messages from the same group also as virtually consumed group[0..position].each do |markable_offset| # Set previous messages metadata offset as the offset of higher one for overwrites # unless a different metadata were set explicitely [markable_offset] ||= @marked[markable_offset] = true end # Recompute the real offset representation materialize_real_offset end |
#mark_until(message, offset_metadata) ⇒ Object
Mark all from all groups including the message. Useful when operating in a collapsed state for marking
121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 121 def mark_until(, ) mark(, ) @groups.each do |group| group.each do |offset| next if offset > .offset [offset] = @marked[offset] = true end end materialize_real_offset end |
#markable ⇒ Array<Messages::Seek, String>
Returns markable message for real offset marking and its associated metadata.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 149 def markable raise Errors::InvalidRealOffsetUsageError unless markable? = case when :exact .fetch(@real_offset) when :current else raise Errors::UnsupportedCaseError, end [ Messages::Seek.new( @topic, @partition, @real_offset ), ] end |
#markable? ⇒ Boolean
Is there a real offset we can mark as consumed
143 144 145 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 143 def markable? !@real_offset.negative? end |
#marked ⇒ Array<Integer>
Returns Offsets of messages already marked as consumed virtually.
137 138 139 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 137 def marked @marked.select { |_, status| status }.map(&:first).sort end |
#register(offsets_group) ⇒ Object
Registers an offset group coming from one virtual consumer. In order to move the real underlying offset accordingly, we need to make sure to track the virtual consumers offsets groups independently and only materialize the end result.
75 76 77 78 79 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 75 def register(offsets_group) @groups << offsets_group offsets_group.each { |offset| @marked[offset] = false } end |