Class: Karafka::Connection::Listener

Inherits:
Object
  • Object
show all
Includes:
Helpers::Async
Defined in:
lib/karafka/connection/listener.rb

Overview

A single listener that listens to incoming messages from a single subscription group. It polls the messages and then enqueues jobs. It also takes care of potential recovery from critical errors by restarting everything in a safe manner.

This is the heart of the consumption process.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::Async

#async_call, included

Constructor Details

#initialize(consumer_group_coordinator, subscription_group, jobs_queue) ⇒ Karafka::Connection::Listener

Returns listener instance.

Parameters:



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/karafka/connection/listener.rb', line 21

def initialize(consumer_group_coordinator, subscription_group, jobs_queue)
  proc_config = ::Karafka::App.config.internal.processing

  @id = SecureRandom.hex(6)
  @consumer_group_coordinator = consumer_group_coordinator
  @subscription_group = subscription_group
  @jobs_queue = jobs_queue
  @coordinators = Processing::CoordinatorsBuffer.new(subscription_group.topics)
  @client = Client.new(@subscription_group)
  @executors = Processing::ExecutorsBuffer.new(@client, subscription_group)
  @jobs_builder = proc_config.jobs_builder
  @partitioner = proc_config.partitioner_class.new(subscription_group)
  # We reference scheduler here as it is much faster than fetching this each time
  @scheduler = proc_config.scheduler
  # We keep one buffer for messages to preserve memory and not allocate extra objects
  # We can do this that way because we always first schedule jobs using messages before we
  # fetch another batch.
  @messages_buffer = MessagesBuffer.new(subscription_group)
  @mutex = Mutex.new
  @stopped = false
end

Instance Attribute Details

#idString (readonly)

Can be useful for logging

Returns:

  • (String)

    id of this listener



15
16
17
# File 'lib/karafka/connection/listener.rb', line 15

def id
  @id
end

Instance Method Details

#callObject

Note:

Prefetch callbacks can be used to seek offset or do other things before we actually start consuming data

Runs the main listener fetch loop.



47
48
49
50
51
52
53
54
55
56
# File 'lib/karafka/connection/listener.rb', line 47

def call
  Karafka.monitor.instrument(
    'connection.listener.before_fetch_loop',
    caller: self,
    client: @client,
    subscription_group: @subscription_group
  )

  fetch_loop
end

#shutdownObject

Note:

This method is not private despite being part of the fetch loop because in case of a forceful shutdown, it may be invoked from a separate thread

Note:

We wrap it with a mutex exactly because of the above case of forceful shutdown

Stops the jobs queue, triggers shutdown on all the executors (sync), commits offsets and stops kafka client.



65
66
67
68
69
70
71
72
73
74
# File 'lib/karafka/connection/listener.rb', line 65

def shutdown
  return if @stopped

  @mutex.synchronize do
    @stopped = true
    @executors.clear
    @coordinators.reset
    @client.stop
  end
end