Module: Karafka::Pro::Processing::Strategies::Ftr::Default
- Includes:
- Default
- Included in:
- Dlq::Ftr, Dlq::FtrLrjMom, Dlq::FtrMom, Vp, Lrj::Ftr, Mom::Ftr
- Defined in:
- lib/karafka/pro/processing/strategies/ftr/default.rb
Overview
Only filtering enabled
Constant Summary collapse
- FEATURES =
Just filtering enabled
%i[ filtering ].freeze
Instance Method Summary collapse
-
#handle_after_consume ⇒ Object
Standard flow without any features.
-
#handle_idle ⇒ Object
Empty run when running on idle means we need to filter.
-
#handle_post_filtering ⇒ Boolean
Throttles by pausing for an expected time period if throttling is needed or seeks in case the throttle expired.
Methods included from Default
#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked, #handle_shutdown, #mark_as_consumed, #mark_as_consumed!
Methods included from Karafka::Processing::Strategies::Base
#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked, #handle_shutdown
Instance Method Details
#handle_after_consume ⇒ Object
Standard flow without any features
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/karafka/pro/processing/strategies/ftr/default.rb', line 35 def handle_after_consume coordinator.on_finished do || return if revoked? if coordinator.success? coordinator.pause_tracker.reset # Do not mark last message if pause happened. This prevents a scenario where # pause is overridden upon rebalance by marking return if coordinator.manual_pause? mark_as_consumed() handle_post_filtering else retry_after_pause end end end |
#handle_idle ⇒ Object
Empty run when running on idle means we need to filter
30 31 32 |
# File 'lib/karafka/pro/processing/strategies/ftr/default.rb', line 30 def handle_idle handle_post_filtering end |
#handle_post_filtering ⇒ Boolean
Throttles by pausing for an expected time period if throttling is needed or seeks in case the throttle expired. Throttling may expire because we throttle before processing starts and we need to compensate for processing time. It may turn out that we don’t have to pause but we need to move the offset because we skipped some messages due to throttling filtering.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/karafka/pro/processing/strategies/ftr/default.rb', line 61 def handle_post_filtering filter = coordinator.filter # We pick the timeout before the action because every action takes time. This time # may then mean we end up having throttle time equal to zero when pause is needed # and this should not happen throttle_timeout = filter.timeout case filter.action when :skip nil when :seek # User direct actions take priority over automatic operations # If we've already seeked we can just resume operations, nothing extra needed return resume if coordinator.manual_seek? = filter.cursor Karafka.monitor.instrument( 'filtering.seek', caller: self, message: ) do seek(.offset, false) end resume when :pause # User direct actions take priority over automatic operations return nil if coordinator.manual_pause? = filter.cursor Karafka.monitor.instrument( 'filtering.throttled', caller: self, message: , timeout: throttle_timeout ) do pause(.offset, throttle_timeout, false) end else raise Karafka::Errors::UnsupportedCaseError filter.action end end |