Module: Karafka::Pro::Processing::Strategies::Dlq::Default

Includes:
Karafka::Pro::Processing::Strategies::Default
Included in:
Aj::DlqLrjMom, Ftr, FtrMom, Lrj, Mom, Vp
Defined in:
lib/karafka/pro/processing/strategies/dlq/default.rb

Overview

Only dead letter queue enabled

Constant Summary collapse

FEATURES =

Features for this strategy

%i[
  dead_letter_queue
].freeze

Instance Method Summary collapse

Methods included from Karafka::Pro::Processing::Strategies::Default

#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked

Methods included from Karafka::Processing::Strategies::Default

#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown, #mark_as_consumed, #mark_as_consumed!

Methods included from Karafka::Processing::Strategies::Base

#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown

Instance Method Details

#build_dlq_message(skippable_message) ⇒ Hash

Returns dispatch DLQ message.

Parameters:

Returns:

  • (Hash)

    dispatch DLQ message



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 99

def build_dlq_message(skippable_message)
  original_partition = skippable_message.partition.to_s

  dlq_message = {
    topic: topic.dead_letter_queue.topic,
    key: original_partition,
    payload: skippable_message.raw_payload,
    headers: skippable_message.headers.merge(
      'original_topic' => topic.name,
      'original_partition' => original_partition,
      'original_offset' => skippable_message.offset.to_s,
      'original_consumer_group' => topic.consumer_group.id
    )
  }

  # Optional method user can define in consumer to enhance the dlq message hash with
  # some extra details if needed or to replace payload, etc
  if respond_to?(:enhance_dlq_message, true)
    enhance_dlq_message(
      dlq_message,
      skippable_message
    )
  end

  dlq_message
end

#dispatch_to_dlq(skippable_message) ⇒ Object

Moves the broken message into a separate queue defined via the settings

Parameters:

Raises:



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 76

def dispatch_to_dlq(skippable_message)
  # DLQ should never try to dispatch a message that was cleaned. It message was
  # cleaned, we will not have all the needed data. If you see this error, it means
  # that your processing flow is not as expected and you have cleaned message that
  # should not be cleaned as it should go to the DLQ
  raise(Cleaner::Errors::MessageCleanedError) if skippable_message.cleaned?

  producer.produce_async(
    build_dlq_message(
      skippable_message
    )
  )

  # Notify about dispatch on the events bus
  Karafka.monitor.instrument(
    'dead_letter_queue.dispatched',
    caller: self,
    message: skippable_message
  )
end

#dispatch_to_dlq?Boolean

Returns should we dispatch the message to DLQ or not. When the dispatch topic is set to false, we will skip the dispatch, effectively ignoring the broken message without taking any action.

Returns:

  • (Boolean)

    should we dispatch the message to DLQ or not. When the dispatch topic is set to false, we will skip the dispatch, effectively ignoring the broken message without taking any action.



129
130
131
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 129

def dispatch_to_dlq?
  topic.dead_letter_queue.topic
end

#find_skippable_messageArray<Karafka::Messages::Message, Boolean>

Finds the message may want to skip (all, starting from first)

Returns:

  • (Array<Karafka::Messages::Message, Boolean>)

    message we may want to skip and information if this message was from marked offset or figured out via mom flow



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 60

def find_skippable_message
  skippable_message = messages.find do |msg|
    coordinator.marked? && msg.offset == coordinator.seek_offset
  end

  # If we don't have the message matching the last comitted offset, it means that
  # user operates with manual offsets and we're beyond the batch in which things
  # broke for the first time. Then we skip the first (as no markings) and we
  # move on one by one.
  skippable_message ? [skippable_message, true] : [messages.first, false]
end

#handle_after_consumeObject

When we encounter non-recoverable message, we skip it and go on with our lives



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 30

def handle_after_consume
  coordinator.on_finished do |last_group_message|
    return if revoked?

    if coordinator.success?
      coordinator.pause_tracker.reset

      return if coordinator.manual_pause?

      mark_as_consumed(last_group_message)
    elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries
      retry_after_pause
    # If we've reached number of retries that we could, we need to skip the first
    # message that was not marked as consumed, pause and continue, while also moving
    # this message to the dead topic
    else
      # We reset the pause to indicate we will now consider it as "ok".
      coordinator.pause_tracker.reset
      skippable_message, = find_skippable_message
      dispatch_to_dlq(skippable_message) if dispatch_to_dlq?
      mark_as_consumed(skippable_message)
      pause(coordinator.seek_offset, nil, false)
    end
  end
end