Method: Karafka::Pro::Processing::Collapser#collapse_until!

Defined in:
lib/karafka/pro/processing/collapser.rb

#collapse_until!(offset) ⇒ Object

Collapse until given offset. Until given offset is encountered or offset bigger than that we keep collapsing.

Parameters:

  • offset (Integer)

    offset until which we keep the collapse



34
35
36
37
38
39
40
41
# File 'lib/karafka/pro/processing/collapser.rb', line 34

def collapse_until!(offset)
  @mutex.synchronize do
    # We check it here in case after a pause and re-fetch we would get less messages and
    # one of them would cause an error. We do not want to overwrite the offset here unless
    # it is bigger.
    @until_offset = offset if offset > @until_offset
  end
end