Class: Karafka::Pro::Processing::Filters::Base

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/pro/processing/filters/base.rb

Overview

Base for all the filters. All filters (including custom) need to use this API.

Due to the fact, that filters can limit data in such a way, that we need to pause or seek (throttling for example), the api is not just “remove some things from batch” but also provides ways to control the post-filtering operations that may be needed.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBase

Initializes the filter as not yet applied



24
25
26
27
# File 'lib/karafka/pro/processing/filters/base.rb', line 24

def initialize
  @applied = false
  @cursor = nil
end

Instance Attribute Details

#cursorKarafka::Messages::Message? (readonly)

Returns the message that we want to use as a cursor one to pause or seek or nil if not applicable.

Returns:

  • (Karafka::Messages::Message, nil)

    the message that we want to use as a cursor one to pause or seek or nil if not applicable.



19
20
21
# File 'lib/karafka/pro/processing/filters/base.rb', line 19

def cursor
  @cursor
end

Instance Method Details

#actionSymbol

Returns filter post-execution action on consumer. Either ‘:skip`, `:pause` or `:seek`.

Returns:

  • (Symbol)

    filter post-execution action on consumer. Either ‘:skip`, `:pause` or `:seek`.



37
38
39
# File 'lib/karafka/pro/processing/filters/base.rb', line 37

def action
  :skip
end

#applied?Boolean

Returns did this filter change messages in any way.

Returns:

  • (Boolean)

    did this filter change messages in any way



42
43
44
# File 'lib/karafka/pro/processing/filters/base.rb', line 42

def applied?
  @applied
end

#apply!(messages) ⇒ Object

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    array with messages. Please keep in mind, this may already be partial due to execution of previous filters.

Raises:

  • (NotImplementedError)


31
32
33
# File 'lib/karafka/pro/processing/filters/base.rb', line 31

def apply!(messages)
  raise NotImplementedError, 'Implement in a subclass'
end

#mark_as_consumed?Boolean

Returns should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any).

Returns:

  • (Boolean)

    should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any)



55
56
57
# File 'lib/karafka/pro/processing/filters/base.rb', line 55

def mark_as_consumed?
  false
end

#marking_cursorKarafka::Messages::Message?

Returns cursor message for marking or nil if no marking.

Returns:



67
68
69
# File 'lib/karafka/pro/processing/filters/base.rb', line 67

def marking_cursor
  cursor
end

#marking_methodSymbol

Returns ‘:mark_as_consumed` or `:mark_as_consumed!`. Applicable only if marking is requested.

Returns:

  • (Symbol)

    ‘:mark_as_consumed` or `:mark_as_consumed!`. Applicable only if marking is requested



61
62
63
# File 'lib/karafka/pro/processing/filters/base.rb', line 61

def marking_method
  :mark_as_consumed
end

#timeoutInteger?

Note:

Please do not return ‘0` when your filter is not pausing as it may interact with other filters that want to pause.

Returns default timeout for pausing (if applicable) or nil if not.

Returns:

  • (Integer, nil)

    default timeout for pausing (if applicable) or nil if not



49
50
51
# File 'lib/karafka/pro/processing/filters/base.rb', line 49

def timeout
  nil
end