Module: Karafka::Consumers::Callbacks

Defined in:
lib/karafka/consumers/callbacks.rb

Overview

Additional callbacks that can be used to trigger some actions on certain moments like manual offset management, committing or anything else outside of a standard messages flow They are not included by default, as we don’t want to provide functionalities that are not required by users by default Please refer to the wiki callbacks page for more details on how to use them

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(consumer_class) ⇒ Object

Parameters:

  • consumer_class (Class)

    consumer class that we extend with callbacks



49
50
51
52
53
# File 'lib/karafka/consumers/callbacks.rb', line 49

def included(consumer_class)
  consumer_class.class_eval do
    extend ClassMethods
  end
end

Instance Method Details

#callObject

Executes the default consumer flow, runs callbacks and if not halted will call process method of a proper backend. It is here because it interacts with the default Karafka call flow and needs to be overwritten to support callbacks



59
60
61
62
63
64
65
66
67
68
# File 'lib/karafka/consumers/callbacks.rb', line 59

def call
  if self.class.respond_to?(:after_fetch)
    Karafka::App.monitor.instrument(
      "consumers.#{Helpers::Inflector.map(self.class.to_s)}.after_fetch",
      context: self
    )
  end

  process
end