Module: Karafka::Pro::Routing::Features::VirtualPartitions::Topic
- Defined in:
- lib/karafka/pro/routing/features/virtual_partitions/topic.rb
Overview
Topic extensions to be able to manage virtual partitions feature
Instance Method Summary collapse
-
#to_h ⇒ Hash
Topic with all its native configuration options plus manual offset management namespace settings.
-
#virtual_partitions(max_partitions: Karafka::App.config.concurrency, partitioner: nil, offset_metadata_strategy: :current, reducer: nil) ⇒ VirtualPartitions
Method that allows to set the virtual partitions details during the routing configuration and then allows to retrieve it.
-
#virtual_partitions? ⇒ Boolean
Are virtual partitions enabled for given topic.
Instance Method Details
#to_h ⇒ Hash
57 58 59 60 61 |
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 57 def to_h super.merge( virtual_partitions: virtual_partitions.to_h ).freeze end |
#virtual_partitions(max_partitions: Karafka::App.config.concurrency, partitioner: nil, offset_metadata_strategy: :current, reducer: nil) ⇒ VirtualPartitions
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 33 def virtual_partitions( max_partitions: Karafka::App.config.concurrency, partitioner: nil, offset_metadata_strategy: :current, reducer: nil ) @virtual_partitions ||= Config.new( active: !partitioner.nil?, max_partitions: max_partitions, partitioner: partitioner, offset_metadata_strategy: , # If no reducer provided, we use this one. It just runs a modulo on the sum of # a stringified version, providing fairly good distribution. reducer: reducer || ->(virtual_key) { virtual_key.to_s.sum % max_partitions } ) end |
#virtual_partitions? ⇒ Boolean
51 52 53 |
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 51 def virtual_partitions? virtual_partitions.active? end |