Module: Karafka::Pro::Processing::Strategies::Default

Overview

No features enabled. No manual offset management No long running jobs No virtual partitions Nothing. Just standard, automatic flow

Constant Summary collapse

FEATURES =

Apply strategy for a non-feature based flow

i[].freeze

Instance Method Summary collapse

Methods included from Karafka::Processing::Strategies::Default

#commit_offsets, #commit_offsets!, #handle_eofed, #handle_idle, #handle_initialized, #handle_shutdown, #handle_wrap

Methods included from Karafka::Processing::Strategies::Base

#handle_idle, #handle_shutdown

Instance Method Details

#handle_after_consumeObject

Standard flow without any features



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/karafka/pro/processing/strategies/default.rb', line 354

def handle_after_consume
  coordinator.on_finished do |last_group_message|
    return if revoked?

    if coordinator.success?
      coordinator.pause_tracker.reset

      # Do not mark last message if pause happened. This prevents a scenario where pause
      # is overridden upon rebalance by marking
      return if coordinator.manual_pause?

      mark_as_consumed(last_group_message)
    else
      retry_after_pause
    end
  end
end

#handle_before_consumeObject

Increment number of attempts per one “full” job. For all VP on a single topic partition this also should run once.



321
322
323
324
325
# File 'lib/karafka/pro/processing/strategies/default.rb', line 321

def handle_before_consume
  coordinator.on_started do
    coordinator.pause_tracker.increment
  end
end

#handle_before_schedule_consumeObject

No actions needed for the standard flow here



313
314
315
316
317
# File 'lib/karafka/pro/processing/strategies/default.rb', line 313

def handle_before_schedule_consume
  monitor.instrument('consumer.before_schedule_consume', caller: self)

  nil
end

#handle_before_schedule_tickObject

No action needed for the tick standard flow



389
390
391
392
393
# File 'lib/karafka/pro/processing/strategies/default.rb', line 389

def handle_before_schedule_tick
  monitor.instrument('consumer.before_schedule_tick', caller: self)

  nil
end

#handle_consumeObject

Run the user consumption code



328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/karafka/pro/processing/strategies/default.rb', line 328

def handle_consume
  # We should not run the work at all on a partition that was revoked
  # This can happen primarily when an LRJ job gets to the internal worker queue and
  # this partition is revoked prior processing.
  unless revoked?
    monitor.instrument('consumer.consume', caller: self)
    monitor.instrument('consumer.consumed', caller: self) do
      consume
    end
  end

  # Mark job as successful
  coordinator.success!(self)
rescue StandardError => e
  # If failed, mark as failed
  coordinator.failure!(self, e)

  # Re-raise so reported in the consumer
  raise e
ensure
  # We need to decrease number of jobs that this coordinator coordinates as it has
  # finished
  coordinator.decrement(:consume)
end

#handle_revokedObject

Standard flow for revocation



373
374
375
376
377
378
379
380
381
382
383
384
385
386
# File 'lib/karafka/pro/processing/strategies/default.rb', line 373

def handle_revoked
  coordinator.on_revoked do
    resume

    coordinator.revoke
  end

  monitor.instrument('consumer.revoke', caller: self)
  monitor.instrument('consumer.revoked', caller: self) do
    revoked
  end
ensure
  coordinator.decrement(:revoked)
end

#handle_tickObject

Runs the consumer #tick method with reporting



396
397
398
399
400
401
402
403
# File 'lib/karafka/pro/processing/strategies/default.rb', line 396

def handle_tick
  monitor.instrument('consumer.tick', caller: self)
  monitor.instrument('consumer.ticked', caller: self) do
    tick
  end
ensure
  coordinator.decrement(:periodic)
end

#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean

Note:

We keep track of this offset in case we would mark as consumed and got error when processing another message. In case like this we do not pause on the message we’ve already processed but rather at the next one. This applies to both sync and async versions of this method.

Marks message as consumed in an async way.

Parameters:

  • message (Messages::Message)

    last successfully processed message.

  • offset_metadata (String, nil) (defaults to: @_current_offset_metadata)

    offset metadata string or nil if nothing

Returns:

  • (Boolean)

    true if we were able to mark the offset, false otherwise. False indicates that we were not able and that we have lost the partition.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/karafka/pro/processing/strategies/default.rb', line 65

def mark_as_consumed(message,  = )
  # If we are inside a transaction than we can just mark as consumed within it
  if @_in_transaction
    mark_in_transaction(message, , true)
  elsif @_in_transaction_marked
    mark_in_memory(message)
  else
    # seek offset can be nil only in case `#seek` was invoked with offset reset request
    # In case like this we ignore marking
    return true if seek_offset.nil?
    # Ignore if it is the same offset as the one that is marked currently
    # We ignore second marking because it changes nothing and in case of people using
    # metadata storage but with automatic offset marking, this would cause metadata to be
    # erased by automatic marking
    return true if (seek_offset - 1) == message.offset
    return false if revoked?

    # If we are not inside a transaction but this is a transactional topic, we mark with
    # artificially created transaction
    stored = if producer.transactional?
               mark_with_transaction(message, , true)
             elsif @_transactional_marking
               raise Errors::NonTransactionalMarkingAttemptError
             else
               client.mark_as_consumed(message, )
             end

    return revoked? unless stored

    self.seek_offset = message.offset + 1
  end

  true
ensure
   = nil
end

#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean

Marks message as consumed in a sync way.

Parameters:

  • message (Messages::Message)

    last successfully processed message.

  • offset_metadata (String, nil) (defaults to: @_current_offset_metadata)

    offset metadata string or nil if nothing

Returns:

  • (Boolean)

    true if we were able to mark the offset, false otherwise. False indicates that we were not able and that we have lost the partition.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/karafka/pro/processing/strategies/default.rb', line 108

def mark_as_consumed!(message,  = )
  if @_in_transaction
    mark_in_transaction(message, , false)
  elsif @_in_transaction_marked
    mark_in_memory(message)
  else
    # seek offset can be nil only in case `#seek` was invoked with offset reset request
    # In case like this we ignore marking
    return true if seek_offset.nil?
    # Ignore if it is the same offset as the one that is marked currently
    # We ignore second marking because it changes nothing and in case of people using
    # metadata storage but with automatic offset marking, this would cause metadata to be
    # erased by automatic marking
    return true if (seek_offset - 1) == message.offset
    return false if revoked?

    # If we are not inside a transaction but this is a transactional topic, we mark with
    # artificially created transaction
    stored = if producer.transactional?
               mark_with_transaction(message, , false)
             elsif @_transactional_marking
               raise Errors::NonTransactionalMarkingAttemptError
             else
               client.mark_as_consumed!(message, )
             end

    return revoked? unless stored

    self.seek_offset = message.offset + 1
  end

  true
ensure
   = nil
end

#mark_in_memory(message) ⇒ Boolean

Marks the current state only in memory as the offset marking has already happened using the producer transaction

Parameters:

Returns:

  • (Boolean)

    true if all good, false if we lost assignment and no point in marking



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/karafka/pro/processing/strategies/default.rb', line 293

def mark_in_memory(message)
  # seek offset can be nil only in case `#seek` was invoked with offset reset request
  # In case like this we ignore marking
  return true if seek_offset.nil?
  # Ignore if it is the same offset as the one that is marked currently
  # We ignore second marking because it changes nothing and in case of people using
  # metadata storage but with automatic offset marking, this would cause metadata to be
  # erased by automatic marking
  return true if (seek_offset - 1) == message.offset
  return false if revoked?

  # If we have already marked this successfully in a transaction that was running
  # we should not mark it again with the client offset delegation but instead we should
  # just align the in-memory state
  self.seek_offset = message.offset + 1

  true
end

#mark_in_transaction(message, offset_metadata, async) ⇒ Object

Stores the next offset for processing inside of the transaction and stores it in a local accumulator for post-transaction status update

Parameters:

  • message (Messages::Message)

    message we want to commit inside of a transaction

  • offset_metadata (String, nil)

    offset metadata or nil if none

  • async (Boolean)

    should we mark in async or sync way (applicable only to post transaction state synchronization usage as within transaction it is always sync)

Raises:



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/karafka/pro/processing/strategies/default.rb', line 245

def mark_in_transaction(message, , async)
  raise Errors::TransactionRequiredError unless @_in_transaction
  raise Errors::AssignmentLostError if revoked?

  producer.transaction_mark_as_consumed(
    client,
    message,
    
  )

  # This one is long lived and used to make sure, that users do not mix transactional
  # marking with non-transactional. When this happens we should raise error
  @_transactional_marking = true
  @_in_transaction_marked = true
  @_transaction_marked ||= []
  @_transaction_marked << [message, , async]
end

#mark_with_transaction(message, offset_metadata, async) ⇒ Boolean

Returns false if marking failed otherwise true.

Parameters:

  • message (Messages::Message)

    message we want to commit inside of a transaction

  • offset_metadata (String, nil)

    offset metadata or nil if none

  • async (Boolean)

    should we mark in async or sync way (applicable only to post transaction state synchronization usage as within transaction it is always sync)

Returns:

  • (Boolean)

    false if marking failed otherwise true



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/karafka/pro/processing/strategies/default.rb', line 269

def mark_with_transaction(message, , async)
  # This flag is used by VPs to differentiate between user initiated transactions and
  # post-execution system transactions.
  @_transaction_internal = true

  transaction do
    mark_in_transaction(message, , async)
  end

  true
# We handle both cases here because this is a private API for internal usage and we want
# the post-user code execution marking with transactional producer to result in a
# boolean state of marking for further framework flow. This is a normalization to make it
# behave the same way as it would behave with a non-transactional one
rescue Rdkafka::RdkafkaError, Errors::AssignmentLostError
  false
ensure
  @_transaction_internal = false
end

#store_offset_metadata(offset_metadata) ⇒ Object

Note:

Please be aware, that offset metadata set this way will be passed to any marking as consumed even if it was not user initiated. For example in the DLQ flow.

Allows to set offset metadata that will be used with the upcoming marking as consumed as long as a different offset metadata was not used. After it was used either via #mark_as_consumed or #mark_as_consumed! it will be set back to nil. It is done that way to provide the end user with ability to influence metadata on the non-user initiated markings in complex flows.

Parameters:

  • offset_metadata (String, nil)

    metadata we want to store with the upcoming marking as consumed



50
51
52
# File 'lib/karafka/pro/processing/strategies/default.rb', line 50

def ()
   = 
end

#transaction(active_producer = producer) { ... } ⇒ Object

Note:

Please note, that if you provide the producer, it will reassign the producer of the consumer for the transaction time. This means, that in case you would even accidentally refer to ‘Consumer#producer` from other threads, it will contain the reassigned producer and not the initially used/assigned producer. It is done that way, so the message producing aliases operate from within transactions and since the producer in transaction is locked, it will prevent other threads from using it.

Starts producer transaction, saves the transaction context for transactional marking and runs user code in this context

Transactions on a consumer level differ from those initiated by the producer as they allow to mark offsets inside of the transaction. If the transaction is initialized only from the consumer, the offset will be stored in a regular fashion.

Parameters:

  • active_producer (WaterDrop::Producer) (defaults to: producer)

    alternative producer instance we may want to use. It is useful when we have connection pool or any other selective engine for managing multiple producers. If not provided, default producer taken from #producer will be used.

Yields:

  • code that we want to run in a transaction



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/karafka/pro/processing/strategies/default.rb', line 164

def transaction(active_producer = producer)
  default_producer = nil
  transaction_started = nil

  monitor.instrument('consumer.consuming.transaction', caller: self) do
    default_producer = producer
    self.producer = active_producer

    transaction_started = false
    transaction_completed = false

    # Prevent from nested transactions. It would not make any sense
    raise Errors::TransactionAlreadyInitializedError if @_in_transaction

    transaction_started = true
    @_transaction_marked = []
    @_in_transaction = true
    @_in_transaction_marked = false

    producer.transaction do
      yield

      # Ensure this transaction is rolled back if we have lost the ownership of this
      # transaction. We do it only for transactions that contain offset management as for
      # producer only, this is not relevant.
      raise Errors::AssignmentLostError if @_in_transaction_marked && revoked?

      # If we do not reach this, we should not move seek offsets because it means that
      # either an error occured or transaction was aborted.
      # In case of error, it will bubble up so no issue but in case of abort, while we
      # do not reach this place, the code will continue
      transaction_completed = true
    end

    @_in_transaction = false

    # This offset is already stored in transaction but we set it here anyhow because we
    # want to make sure our internal in-memory state is aligned with the transaction
    #
    # @note We never need to use the blocking `#mark_as_consumed!` here because the
    #   offset anyhow was already stored during the transaction
    #
    # @note Since the offset could have been already stored in Kafka (could have because
    #   you can have transactions without marking), we use the `@_in_transaction_marked`
    #   state to decide if we need to dispatch the offset via client at all
    #   (if post transaction, then we do not have to)
    #
    # @note In theory we could only keep reference to the most recent marking and reject
    #   others. We however do not do it for two reasons:
    #   - User may have non standard flow relying on some alternative order and we want
    #     to mimic this
    #   - Complex strategies like VPs can use this in VPs to mark in parallel without
    #     having to redefine the transactional flow completely
    #
    # @note This should be applied only if transaction did not error and if it was not
    #   aborted.
    if transaction_completed
      @_transaction_marked.each do |marking|
        marking.pop ? mark_as_consumed(*marking) : mark_as_consumed!(*marking)
      end
    end

    true
  end
ensure
  self.producer = default_producer

  if transaction_started
    @_transaction_marked.clear
    @_in_transaction = false
    @_in_transaction_marked = false
  end
end