Module: Karafka::Consumers::Callbacks

Defined in:
lib/karafka/consumers/callbacks.rb

Overview

Additional callbacks that can be used to trigger some actions on certain moments like manual offset management, committing or anything else outside of a standard messages flow They are not included by default, as we don’t want to provide functionalities that are not required by users by default Please refer to the wiki callbacks page for more details on how to use them

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

TYPES =

Types of events on which we run callbacks

%i[
  after_fetch
  after_poll
  before_poll
  before_stop
].freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(consumer_class) ⇒ Object

Parameters:

  • consumer_class (Class)

    consumer class that we extend with callbacks



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/karafka/consumers/callbacks.rb', line 32

def self.included(consumer_class)
  consumer_class.class_eval do
    extend ClassMethods
    include ActiveSupport::Callbacks

    # The call method is wrapped with a set of callbacks
    # We won't run process if any of the callbacks throw abort
    # @see http://api.rubyonrails.org/classes/ActiveSupport/Callbacks/ClassMethods.html#method-i-get_callbacks
    TYPES.each { |type| define_callbacks type }
  end
end

Instance Method Details

#callObject

Executes the default consumer flow, runs callbacks and if not halted will call process method of a proper backend. It is here because it interacts with the default Karafka call flow and needs to be overwritten to support callbacks



47
48
49
50
51
# File 'lib/karafka/consumers/callbacks.rb', line 47

def call
  run_callbacks :after_fetch do
    process
  end
end