Class: Karafka::Pro::Processing::ParallelSegments::Filters::Mom

Inherits:
Base show all
Defined in:
lib/karafka/pro/processing/parallel_segments/filters/mom.rb

Overview

Note:

This filter should be used only when manual offset management is enabled. For automatic offset management scenarios use the regular filter instead.

Filter used for handling parallel segments when manual offset management (mom) is enabled. Provides message distribution without any post-filtering offset state management as it is fully user-based.

Since with manual offset management we need to ensure that offsets are never marked even in cases where all data in a batch is filtered out.

This separation allows for cleaner implementation and easier debugging of each flow.

Instance Attribute Summary

Attributes inherited from Filters::Base

#cursor

Instance Method Summary collapse

Methods inherited from Base

#initialize

Methods inherited from Filters::Base

#action, #initialize, #marking_cursor, #marking_method

Constructor Details

This class inherits a constructor from Karafka::Pro::Processing::ParallelSegments::Filters::Base

Instance Method Details

#applied?Boolean

Returns true if any messages were filtered out.

Returns:

  • (Boolean)

    true if any messages were filtered out



64
65
66
# File 'lib/karafka/pro/processing/parallel_segments/filters/mom.rb', line 64

def applied?
  @applied
end

#apply!(messages) ⇒ Object

Applies the filter to the batch of messages It removes messages that don’t belong to the current parallel segment group based on the partitioner and reducer logic without any offset marking

Parameters:



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/karafka/pro/processing/parallel_segments/filters/mom.rb', line 46

def apply!(messages)
  @applied = false

  # Filter out messages that don't match our segment group
  messages.delete_if do |message|
    message_segment_key = partition(message)
    # Use the reducer to get the target group for this message
    target_segment = reduce(message_segment_key)
    # Remove the message if it doesn't belong to our segment
    remove = target_segment != @segment_id

    @applied = true if remove

    remove
  end
end

#mark_as_consumed?Boolean

Returns false, as mom mode never marks as consumed automatically.

Returns:

  • (Boolean)

    false, as mom mode never marks as consumed automatically



69
70
71
# File 'lib/karafka/pro/processing/parallel_segments/filters/mom.rb', line 69

def mark_as_consumed?
  false
end

#timeoutnil

Returns Since we do not timeout ever in this filter, we should not return any value for it.

Returns:

  • (nil)

    Since we do not timeout ever in this filter, we should not return any value for it.



75
76
77
# File 'lib/karafka/pro/processing/parallel_segments/filters/mom.rb', line 75

def timeout
  nil
end