Method: Karafka::Pro::Processing::Collapser#collapse_until!
- Defined in:
- lib/karafka/pro/processing/collapser.rb
#collapse_until!(offset) ⇒ Object
Collapse until given offset. Until given offset is encountered or offset bigger than that we keep collapsing.
34 35 36 37 38 39 40 41 |
# File 'lib/karafka/pro/processing/collapser.rb', line 34 def collapse_until!(offset) @mutex.synchronize do # We check it here in case after a pause and re-fetch we would get less messages and # one of them would cause an error. We do not want to overwrite the offset here unless # it is bigger. @until_offset = offset if offset > @until_offset end end |