Class: Karafka::Fetcher

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/fetcher.rb

Overview

Note:

Creating multiple fetchers will result in having multiple connections to the same topics, which means that if there are no partitions, it won’t use them.

Class used to run the Karafka consumer and handle shutting down, restarting etc

Instance Method Summary collapse

Instance Method Details

#fetch_loopObject

Starts listening on all the listeners asynchronously Fetch loop should never end, which means that we won’t create more actor clusters so we don’t have to terminate them



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/karafka/fetcher.rb', line 11

def fetch_loop
  threads = listeners.map do |listener|
    # We abort on exception because there should be an exception handling developed for
    # each listener running in separate threads, so the exceptions should never leak
    # and if that happens, it means that something really bad happened and we should stop
    # the whole process
    Thread
      .new { listener.fetch_loop(processor) }
      .tap { |thread| thread.abort_on_exception = true }
  end

  threads.each(&:join)
# If anything crashes here, we need to raise the error and crush the runner because it means
# that something really bad happened
rescue StandardError => e
  Karafka.monitor.notice_error(self.class, e)
  Karafka::App.stop!
  raise e
end