Module: Karafka::Pro::Routing::Features::VirtualPartitions::Topic

Defined in:
lib/karafka/pro/routing/features/virtual_partitions/topic.rb

Overview

Topic extensions to be able to manage virtual partitions feature

Instance Method Summary collapse

Instance Method Details

#to_hHash



57
58
59
60
61
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 57

def to_h
  super.merge(
    virtual_partitions: virtual_partitions.to_h
  ).freeze
end

#virtual_partitions(max_partitions: Karafka::App.config.concurrency, partitioner: nil, offset_metadata_strategy: :current, reducer: nil) ⇒ VirtualPartitions



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 33

def virtual_partitions(
  max_partitions: Karafka::App.config.concurrency,
  partitioner: nil,
  offset_metadata_strategy: :current,
  reducer: nil
)
  @virtual_partitions ||= Config.new(
    active: !partitioner.nil?,
    max_partitions: max_partitions,
    partitioner: partitioner,
    offset_metadata_strategy: ,
    # If no reducer provided, we use this one. It just runs a modulo on the sum of
    # a stringified version, providing fairly good distribution.
    reducer: reducer || ->(virtual_key) { virtual_key.to_s.sum % max_partitions }
  )
end

#virtual_partitions?Boolean



51
52
53
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 51

def virtual_partitions?
  virtual_partitions.active?
end