Module: Karafka::Patches::RubyKafka

Defined in:
lib/karafka/patches/ruby_kafka.rb

Overview

Patches for Ruby Kafka gem

Instance Method Summary collapse

Instance Method Details

#consumer_loopObject

This patch allows us to inject business logic in between fetches and before the consumer stop, so we can perform stop commit or anything else that we need since ruby-kafka fetch loop does not allow that directly We don’t won’t to use poll ruby-kafka api as it brings many more problems that we would have to take care of. That way, nothing like that ever happens but we get the control over the stopping process that we need (since we’re the once that initiate it for each thread)



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/karafka/patches/ruby_kafka.rb', line 15

def consumer_loop
  super do
    consumers = Karafka::Persistence::Consumers
                .current
                .values
                .flat_map(&:values)
                .select { |consumer| consumer.class.respond_to?(:after_fetch) }

    if Karafka::App.stopping?
      publish_event(consumers, 'before_stop')
      Karafka::Persistence::Client.read.stop
    else
      publish_event(consumers, 'before_poll')
      yield
      publish_event(consumers, 'after_poll')
    end
  end
end