Module: Karafka::Pro::Routing::Features::ParallelSegments::ConsumerGroup
- Defined in:
- lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb
Overview
Parallel segments are defined on the consumer group (since it creates many), thus we define them on the consumer group. This module adds extra methods needed there to make it work
Instance Method Summary collapse
-
#parallel_segments ⇒ Config
Parallel segments config.
-
#parallel_segments=(count: 1, partitioner: nil, reducer: nil, merge_key: '-parallel-') ⇒ Object
Allows setting parallel segments configuration.
-
#parallel_segments? ⇒ Boolean
Are parallel segments active.
-
#segment_id ⇒ Integer
Id of the segment (0 or bigger) or -1 if parallel segments are not active.
-
#segment_origin ⇒ String
Original segment consumer group name.
-
#to_h ⇒ Hash
Consumer group setup with the parallel segments definition in it.
Instance Method Details
#parallel_segments ⇒ Config
Returns parallel segments config.
33 34 35 36 |
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 33 def parallel_segments # We initialize it as disabled if not configured by the user public_send(:parallel_segments=, count: 1) end |
#parallel_segments=(count: 1, partitioner: nil, reducer: nil, merge_key: '-parallel-') ⇒ Object
This method is an assignor but the API is actually via the #parallel_segments method. Our Routing::Proxy normalizes that the way we want to have it exposed for the end users.
Allows setting parallel segments configuration
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 50 def parallel_segments=( count: 1, partitioner: nil, reducer: nil, merge_key: '-parallel-' ) @parallel_segments ||= Config.new( active: count > 1, count: count, partitioner: partitioner, reducer: reducer || ->(parallel_key) { parallel_key.to_s.sum % count }, merge_key: merge_key ) end |
#parallel_segments? ⇒ Boolean
Returns are parallel segments active.
66 67 68 |
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 66 def parallel_segments? parallel_segments.active? end |
#segment_id ⇒ Integer
Returns id of the segment (0 or bigger) or -1 if parallel segments are not active.
72 73 74 75 76 77 78 79 80 |
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 72 def segment_id return @segment_id if @segment_id @segment_id = if parallel_segments? name.split(parallel_segments.merge_key).last.to_i else -1 end end |
#segment_origin ⇒ String
Returns original segment consumer group name.
83 84 85 |
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 83 def segment_origin name.split(parallel_segments.merge_key).first end |
#to_h ⇒ Hash
Returns consumer group setup with the parallel segments definition in it.
88 89 90 91 92 93 94 |
# File 'lib/karafka/pro/routing/features/parallel_segments/consumer_group.rb', line 88 def to_h super.merge( parallel_segments: parallel_segments.to_h.merge( segment_id: segment_id ) ).freeze end |