Class: Karafka::Pro::Processing::ParallelSegments::Filters::Default
- Inherits:
-
Base
- Object
- Filters::Base
- Base
- Karafka::Pro::Processing::ParallelSegments::Filters::Default
- Defined in:
- lib/karafka/pro/processing/parallel_segments/filters/default.rb
Overview
This is the default filter that should be used when manual offset management is not enabled. For manual offset management scenarios use the Mom filter instead.
Filter used for handling parallel segments with automatic offset management. Handles message distribution and ensures proper offset management when messages are filtered out during the distribution process.
When operating in automatic offset management mode, this filter takes care of marking offsets of messages that were filtered out during the distribution process to maintain proper offset progression.
Instance Method Summary collapse
-
#applied? ⇒ Boolean
True if any messages were filtered out.
-
#apply!(messages) ⇒ Object
Applies the filter to the batch of messages It removes messages that don’t belong to the current parallel segment group based on the partitioner and reducer logic.
-
#cursor ⇒ Object
Only return cursor if we wanted to mark as consumed in case all was filtered.
-
#mark_as_consumed? ⇒ Boolean
True if we should mark as consumed (when all were filtered).
-
#timeout ⇒ nil
Since we do not timeout ever in this filter, we should not return any value for it.
Methods inherited from Base
Methods inherited from Filters::Base
#action, #initialize, #marking_cursor, #marking_method
Constructor Details
This class inherits a constructor from Karafka::Pro::Processing::ParallelSegments::Filters::Base
Instance Method Details
#applied? ⇒ Boolean
Returns true if any messages were filtered out.
77 78 79 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 77 def applied? @applied end |
#apply!(messages) ⇒ Object
Applies the filter to the batch of messages It removes messages that don’t belong to the current parallel segment group based on the partitioner and reducer logic
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 46 def apply!() @applied = false @all_filtered = false @cursor = .first # Keep track of how many messages we had initially initial_size = .size # Filter out messages that don't match our segment group .delete_if do || = partition() # Use the reducer to get the target group for this message target_segment = reduce() # Remove the message if it doesn't belong to our group remove = target_segment != @segment_id if remove @cursor = @applied = true end remove end # If all messages were filtered out, we want to mark them as consumed @all_filtered = .empty? && initial_size.positive? end |
#cursor ⇒ Object
Only return cursor if we wanted to mark as consumed in case all was filtered. Otherwise it could interfere with other filters
94 95 96 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 94 def cursor @all_filtered ? @cursor : nil end |
#mark_as_consumed? ⇒ Boolean
Returns true if we should mark as consumed (when all were filtered).
82 83 84 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 82 def mark_as_consumed? @all_filtered end |
#timeout ⇒ nil
Returns Since we do not timeout ever in this filter, we should not return any value for it.
88 89 90 |
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 88 def timeout nil end |