Class: Karafka::Pro::Processing::Collapser
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::Collapser
- Defined in:
- lib/karafka/pro/processing/collapser.rb
Overview
Manages the collapse of virtual partitions Since any non-virtual partition is actually a virtual partition of size one, we can use it in a generic manner without having to distinguish between those cases.
We need to have notion of the offset until we want to collapse because upon pause and retry rdkafka may purge the buffer. This means, that we may end up with smaller or bigger (different) dataset and without tracking the end of collapse, there would be a chance for things to flicker. Tracking allows us to ensure, that collapse is happening until all the messages from the corrupted batch are processed.
Instance Method Summary collapse
-
#collapse_until!(offset) ⇒ Object
Collapse until given offset.
-
#collapsed? ⇒ Boolean
Should we collapse into a single consumer.
-
#initialize ⇒ Collapser
constructor
When initialized, nothing is collapsed.
-
#refresh!(first_offset) ⇒ Object
Sets the collapse state based on the first collective offset that we are going to process and makes the decision whether or not we need to still keep the collapse.
Constructor Details
#initialize ⇒ Collapser
When initialized, nothing is collapsed
37 38 39 40 41 |
# File 'lib/karafka/pro/processing/collapser.rb', line 37 def initialize @collapsed = false @until_offset = -1 @mutex = Mutex.new end |
Instance Method Details
#collapse_until!(offset) ⇒ Object
Collapse until given offset. Until given offset is encountered or offset bigger than that we keep collapsing.
51 52 53 54 55 56 57 58 |
# File 'lib/karafka/pro/processing/collapser.rb', line 51 def collapse_until!(offset) @mutex.synchronize do # We check it here in case after a pause and re-fetch we would get less messages and # one of them would cause an error. We do not want to overwrite the offset here unless # it is bigger. @until_offset = offset if offset > @until_offset end end |
#collapsed? ⇒ Boolean
Returns Should we collapse into a single consumer.
44 45 46 |
# File 'lib/karafka/pro/processing/collapser.rb', line 44 def collapsed? @collapsed end |
#refresh!(first_offset) ⇒ Object
Sets the collapse state based on the first collective offset that we are going to process and makes the decision whether or not we need to still keep the collapse.
63 64 65 66 67 |
# File 'lib/karafka/pro/processing/collapser.rb', line 63 def refresh!(first_offset) @mutex.synchronize do @collapsed = first_offset < @until_offset end end |