Class: Karafka::Pro::Processing::ParallelSegments::Filters::Default

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/pro/processing/parallel_segments/filters/default.rb

Overview

Note:

This is the default filter that should be used when manual offset management is not enabled. For manual offset management scenarios use the Mom filter instead.

Filter used for handling parallel segments with automatic offset management. Handles message distribution and ensures proper offset management when messages are filtered out during the distribution process.

When operating in automatic offset management mode, this filter takes care of marking offsets of messages that were filtered out during the distribution process to maintain proper offset progression.

Instance Method Summary collapse

Methods inherited from Base

#initialize

Methods inherited from Filters::Base

#action, #initialize, #marking_cursor, #marking_method

Constructor Details

This class inherits a constructor from Karafka::Pro::Processing::ParallelSegments::Filters::Base

Instance Method Details

#applied?Boolean

Returns true if any messages were filtered out.

Returns:

  • (Boolean)

    true if any messages were filtered out



60
61
62
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 60

def applied?
  @applied
end

#apply!(messages) ⇒ Object

Applies the filter to the batch of messages It removes messages that don’t belong to the current parallel segment group based on the partitioner and reducer logic

Parameters:



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 29

def apply!(messages)
  @applied = false
  @all_filtered = false
  @cursor = messages.first

  # Keep track of how many messages we had initially
  initial_size = messages.size

  # Filter out messages that don't match our segment group
  messages.delete_if do |message|
    message_segment_key = partition(message)

    # Use the reducer to get the target group for this message
    target_segment = reduce(message_segment_key)

    # Remove the message if it doesn't belong to our group
    remove = target_segment != @segment_id

    if remove
      @cursor = message
      @applied = true
    end

    remove
  end

  # If all messages were filtered out, we want to mark them as consumed
  @all_filtered = messages.empty? && initial_size.positive?
end

#cursorObject

Only return cursor if we wanted to mark as consumed in case all was filtered. Otherwise it could interfere with other filters



77
78
79
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 77

def cursor
  @all_filtered ? @cursor : nil
end

#mark_as_consumed?Boolean

Returns true if we should mark as consumed (when all were filtered).

Returns:

  • (Boolean)

    true if we should mark as consumed (when all were filtered)



65
66
67
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 65

def mark_as_consumed?
  @all_filtered
end

#timeoutnil

Returns Since we do not timeout ever in this filter, we should not return any value for it.

Returns:

  • (nil)

    Since we do not timeout ever in this filter, we should not return any value for it.



71
72
73
# File 'lib/karafka/pro/processing/parallel_segments/filters/default.rb', line 71

def timeout
  nil
end