Module: Karafka::Pro::Processing::Strategies::Aj::DlqMomVp

Includes:
Default, Dlq::Vp, Vp::Default
Included in:
DlqFtrLrjMomVp, DlqFtrMomVp, DlqLrjMomVp
Defined in:
lib/karafka/pro/processing/strategies/aj/dlq_mom_vp.rb

Overview

ActiveJob enabled Manual offset management enabled Virtual Partitions enabled

Constant Summary collapse

FEATURES =

Features for this strategy

%i[
  active_job
  dead_letter_queue
  manual_offset_management
  virtual_partitions
].freeze

Instance Method Summary collapse

Methods included from Vp::Default

#collapsed?, #failing?, #mark_as_consumed, #mark_as_consumed!

Methods included from Default

#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked

Methods included from Karafka::Processing::Strategies::Default

#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown, #mark_as_consumed, #mark_as_consumed!

Methods included from Karafka::Processing::Strategies::Base

#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown

Methods included from Dlq::Default

#build_dlq_message, #dispatch_to_dlq, #dispatch_to_dlq?, #find_skippable_message

Instance Method Details

#handle_after_consumeObject

Flow including moving to DLQ in the collapsed mode



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/karafka/pro/processing/strategies/aj/dlq_mom_vp.rb', line 36

def handle_after_consume
  coordinator.on_finished do |last_group_message|
    if coordinator.success?
      coordinator.pause_tracker.reset

      # When this is an ActiveJob running via Pro with virtual partitions, we cannot
      # mark intermediate jobs as processed not to mess up with the ordering.
      # Only when all the jobs are processed and we did not loose the partition
      # assignment and we are not stopping (Pro ActiveJob has an early break) we can
      # commit offsets .
      # For a non virtual partitions case, the flow is regular and state is marked
      # after each successfully processed job
      return if revoked?

      mark_as_consumed(last_group_message)
    elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries
      retry_after_pause
    else
      # Here we are in a collapsed state, hence we can apply the same logic as
      # Aj::DlqMom
      coordinator.pause_tracker.reset
      skippable_message, = find_skippable_message
      dispatch_to_dlq(skippable_message) if dispatch_to_dlq?
      mark_as_consumed(skippable_message)
      pause(coordinator.seek_offset, nil, false)
    end
  end
end