Class: Karafka::Pro::Processing::ParallelSegments::Filters::Default
- Inherits:
-
Base
- Object
- Filters::Base
- Base
- Karafka::Pro::Processing::ParallelSegments::Filters::Default
- Defined in:
- lib/karafka/pro/processing/parallel_segments/filters/default.rb
Overview
This is the default filter that should be used when manual offset management is not enabled. For manual offset management scenarios use the Mom filter instead.
Filter used for handling parallel segments with automatic offset management. Handles message distribution and ensures proper offset management when messages are filtered out during the distribution process.
When operating in automatic offset management mode, this filter takes care of marking offsets of messages that were filtered out during the distribution process to maintain proper offset progression.
Instance Method Summary collapse
-
#applied? ⇒ Boolean
True if any messages were filtered out.
-
#apply!(messages) ⇒ Object
Applies the filter to the batch of messages It removes messages that don’t belong to the current parallel segment group based on the partitioner and reducer logic.
-
#cursor ⇒ Object
Only return cursor if we wanted to mark as consumed in case all was filtered.
-
#mark_as_consumed? ⇒ Boolean
True if we should mark as consumed (when all were filtered).
-
#timeout ⇒ nil
Since we do not timeout ever in this filter, we should not return any value for it.
Methods inherited from Base
Methods inherited from Filters::Base
#action, #initialize, #marking_cursor, #marking_method
Constructor Details
This class inherits a constructor from Karafka::Pro::Processing::ParallelSegments::Filters::Base
Instance Method Details
#applied? ⇒ Boolean
Returns true if any messages were filtered out.
60 61 62 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 60 def applied? @applied end |
#apply!(messages) ⇒ Object
Applies the filter to the batch of messages It removes messages that don’t belong to the current parallel segment group based on the partitioner and reducer logic
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 29 def apply!() @applied = false @all_filtered = false @cursor = .first # Keep track of how many messages we had initially initial_size = .size # Filter out messages that don't match our segment group .delete_if do || = partition() # Use the reducer to get the target group for this message target_segment = reduce() # Remove the message if it doesn't belong to our group remove = target_segment != @segment_id if remove @cursor = @applied = true end remove end # If all messages were filtered out, we want to mark them as consumed @all_filtered = .empty? && initial_size.positive? end |
#cursor ⇒ Object
Only return cursor if we wanted to mark as consumed in case all was filtered. Otherwise it could interfere with other filters
77 78 79 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 77 def cursor @all_filtered ? @cursor : nil end |
#mark_as_consumed? ⇒ Boolean
Returns true if we should mark as consumed (when all were filtered).
65 66 67 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 65 def mark_as_consumed? @all_filtered end |
#timeout ⇒ nil
Returns Since we do not timeout ever in this filter, we should not return any value for it.
71 72 73 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 71 def timeout nil end |