Class: Karafka::Pro::Processing::ParallelSegments::Filters::Mom
- Inherits:
-
Base
- Object
- Filters::Base
- Base
- Karafka::Pro::Processing::ParallelSegments::Filters::Mom
- Defined in:
- lib/karafka/pro/processing/parallel_segments/filters/mom.rb
Overview
This filter should be used only when manual offset management is enabled. For automatic offset management scenarios use the regular filter instead.
Filter used for handling parallel segments when manual offset management (mom) is enabled. Provides message distribution without any post-filtering offset state management as it is fully user-based.
Since with manual offset management we need to ensure that offsets are never marked even in cases where all data in a batch is filtered out.
This separation allows for cleaner implementation and easier debugging of each flow.
Instance Attribute Summary
Attributes inherited from Filters::Base
Instance Method Summary collapse
-
#applied? ⇒ Boolean
True if any messages were filtered out.
-
#apply!(messages) ⇒ Object
Applies the filter to the batch of messages It removes messages that don’t belong to the current parallel segment group based on the partitioner and reducer logic without any offset marking.
-
#mark_as_consumed? ⇒ Boolean
False, as mom mode never marks as consumed automatically.
-
#timeout ⇒ nil
Since we do not timeout ever in this filter, we should not return any value for it.
Methods inherited from Base
Methods inherited from Filters::Base
#action, #initialize, #marking_cursor, #marking_method
Constructor Details
This class inherits a constructor from Karafka::Pro::Processing::ParallelSegments::Filters::Base
Instance Method Details
#applied? ⇒ Boolean
Returns true if any messages were filtered out.
64 65 66 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/mom.rb', line 64 def applied? @applied end |
#apply!(messages) ⇒ Object
Applies the filter to the batch of messages It removes messages that don’t belong to the current parallel segment group based on the partitioner and reducer logic without any offset marking
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/mom.rb', line 46 def apply!() @applied = false # Filter out messages that don't match our segment group .delete_if do || = partition() # Use the reducer to get the target group for this message target_segment = reduce() # Remove the message if it doesn't belong to our segment remove = target_segment != @segment_id @applied = true if remove remove end end |
#mark_as_consumed? ⇒ Boolean
Returns false, as mom mode never marks as consumed automatically.
69 70 71 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/mom.rb', line 69 def mark_as_consumed? false end |
#timeout ⇒ nil
Returns Since we do not timeout ever in this filter, we should not return any value for it.
75 76 77 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/mom.rb', line 75 def timeout nil end |