Class: Karafka::Pro::Processing::Filters::Base
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::Filters::Base
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/pro/processing/filters/base.rb
Overview
Base for all the filters. All filters (including custom) need to use this API.
Due to the fact, that filters can limit data in such a way, that we need to pause or seek (throttling for example), the api is not just “remove some things from batch” but also provides ways to control the post-filtering operations that may be needed.
Direct Known Subclasses
Delayer, Expirer, InlineInsightsDelayer, Throttler, VirtualLimiter, ParallelSegments::Filters::Base
Instance Attribute Summary collapse
-
#cursor ⇒ Karafka::Messages::Message?
readonly
The message that we want to use as a cursor one to pause or seek or nil if not applicable.
Instance Method Summary collapse
-
#action ⇒ Symbol
Filter post-execution action on consumer.
-
#applied? ⇒ Boolean
Did this filter change messages in any way.
- #apply!(messages) ⇒ Object
-
#initialize ⇒ Base
constructor
Initializes the filter as not yet applied.
-
#mark_as_consumed? ⇒ Boolean
Should we use the cursor value to mark as consumed.
-
#marking_cursor ⇒ Karafka::Messages::Message?
Cursor message for marking or nil if no marking.
-
#marking_method ⇒ Symbol
:mark_as_consumedor:mark_as_consumed!. -
#timeout ⇒ Integer?
Default timeout for pausing (if applicable) or nil if not.
Constructor Details
#initialize ⇒ Base
Initializes the filter as not yet applied
41 42 43 44 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 41 def initialize @applied = false @cursor = nil end |
Instance Attribute Details
#cursor ⇒ Karafka::Messages::Message? (readonly)
Returns the message that we want to use as a cursor one to pause or seek or nil if not applicable.
36 37 38 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 36 def cursor @cursor end |
Instance Method Details
#action ⇒ Symbol
Returns filter post-execution action on consumer. Either :skip, :pause or :seek.
54 55 56 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 54 def action :skip end |
#applied? ⇒ Boolean
Returns did this filter change messages in any way.
59 60 61 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 59 def applied? @applied end |
#apply!(messages) ⇒ Object
48 49 50 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 48 def apply!() raise NotImplementedError, 'Implement in a subclass' end |
#mark_as_consumed? ⇒ Boolean
Returns should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any).
72 73 74 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 72 def mark_as_consumed? false end |
#marking_cursor ⇒ Karafka::Messages::Message?
Returns cursor message for marking or nil if no marking.
84 85 86 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 84 def marking_cursor cursor end |
#marking_method ⇒ Symbol
Returns :mark_as_consumed or :mark_as_consumed!. Applicable only if marking is requested.
78 79 80 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 78 def marking_method :mark_as_consumed end |
#timeout ⇒ Integer?
Please do not return 0 when your filter is not pausing as it may interact with other filters that want to pause.
Returns default timeout for pausing (if applicable) or nil if not.
66 67 68 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 66 def timeout nil end |