Class: Racecar::ConsumerSet
- Inherits:
-
Object
- Object
- Racecar::ConsumerSet
- Defined in:
- lib/racecar/consumer_set.rb
Constant Summary collapse
- MAX_POLL_TRIES =
10
Instance Method Summary collapse
-
#batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages) ⇒ Object
batch_poll collects messages until any of the following occurs: - max_wait_time_ms time has passed - max_messages have been collected - a nil message was polled (end of topic, Kafka stalled, etc.).
- #close ⇒ Object
- #commit ⇒ Object
- #current ⇒ Object
- #each_subscribed ⇒ Object (also: #each)
-
#initialize(config, logger, instrumenter = NullInstrumenter) ⇒ ConsumerSet
constructor
A new instance of ConsumerSet.
- #pause(topic, partition, offset) ⇒ Object
- #poll(max_wait_time_ms = @config.max_wait_time_ms) ⇒ Object
- #resume(topic, partition) ⇒ Object
- #store_offset(message) ⇒ Object
-
#subscribe_all ⇒ Object
Subscribe to all topics eagerly, even if there’s still messages elsewhere.
Constructor Details
#initialize(config, logger, instrumenter = NullInstrumenter) ⇒ ConsumerSet
Returns a new instance of ConsumerSet.
7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/racecar/consumer_set.rb', line 7 def initialize(config, logger, instrumenter = NullInstrumenter) @config, @logger = config, logger @instrumenter = instrumenter raise ArgumentError, "Subscriptions must not be empty when subscribing" if @config.subscriptions.empty? @consumers = [] @consumer_id_iterator = (0...@config.subscriptions.size).cycle @previous_retries = 0 @last_poll_read_nil_message = false end |
Instance Method Details
#batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages) ⇒ Object
batch_poll collects messages until any of the following occurs:
-
max_wait_time_ms time has passed
-
max_messages have been collected
-
a nil message was polled (end of topic, Kafka stalled, etc.)
The messages are from a single topic, but potentially from more than one partition.
Any errors during polling are retried in an exponential backoff fashion. If an error occurs, but there is no time left for a backoff and retry, it will return the already collected messages and only retry on the next call.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/racecar/consumer_set.rb', line 34 def batch_poll(max_wait_time_ms = @config.max_wait_time_ms, = @config.) started_at = Time.now remain_ms = max_wait_time_ms maybe_select_next_consumer = [] while remain_ms > 0 && .size < remain_ms = remaining_time_ms(max_wait_time_ms, started_at) msg = poll_with_retries(remain_ms) break if msg.nil? << msg end end |
#close ⇒ Object
66 67 68 |
# File 'lib/racecar/consumer_set.rb', line 66 def close each_subscribed(&:close) end |
#commit ⇒ Object
60 61 62 63 64 |
# File 'lib/racecar/consumer_set.rb', line 60 def commit each_subscribed do |consumer| commit_rescue_no_offset(consumer) end end |
#current ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/racecar/consumer_set.rb', line 70 def current @consumers[@consumer_id_iterator.peek] ||= begin consumer_config = Rdkafka::Config.new(rdkafka_config(current_subscription)) consumer_config.consumer_rebalance_listener = @config.rebalance_listener consumer = consumer_config.consumer @instrumenter.instrument('join_group') do consumer.subscribe current_subscription.topic end consumer end end |
#each_subscribed ⇒ Object Also known as: each
83 84 85 86 87 88 89 |
# File 'lib/racecar/consumer_set.rb', line 83 def each_subscribed if block_given? @consumers.each { |c| yield c } else @consumers.each end end |
#pause(topic, partition, offset) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/racecar/consumer_set.rb', line 91 def pause(topic, partition, offset) consumer, filtered_tpl = find_consumer_by(topic, partition) if !consumer @logger.info "Attempted to pause #{topic}/#{partition}, but we're not subscribed to it" return end consumer.pause(filtered_tpl) fake_msg = OpenStruct.new(topic: topic, partition: partition, offset: offset) consumer.seek(fake_msg) end |
#poll(max_wait_time_ms = @config.max_wait_time_ms) ⇒ Object
20 21 22 |
# File 'lib/racecar/consumer_set.rb', line 20 def poll(max_wait_time_ms = @config.max_wait_time_ms) batch_poll(max_wait_time_ms, 1).first end |
#resume(topic, partition) ⇒ Object
103 104 105 106 107 108 109 110 111 |
# File 'lib/racecar/consumer_set.rb', line 103 def resume(topic, partition) consumer, filtered_tpl = find_consumer_by(topic, partition) if !consumer @logger.info "Attempted to resume #{topic}/#{partition}, but we're not subscribed to it" return end consumer.resume(filtered_tpl) end |
#store_offset(message) ⇒ Object
50 51 52 53 54 55 56 57 58 |
# File 'lib/racecar/consumer_set.rb', line 50 def store_offset() current.store_offset() rescue Rdkafka::RdkafkaError => e if e.code == :state # -172 @logger.warn "Attempted to store_offset, but we're not subscribed to it: #{ErroneousStateError.new(e)}" return end raise e end |
#subscribe_all ⇒ Object
Subscribe to all topics eagerly, even if there’s still messages elsewhere. Usually that’s not needed and Kafka might rebalance if topics are not polled frequently enough.
118 119 120 121 122 123 |
# File 'lib/racecar/consumer_set.rb', line 118 def subscribe_all @config.subscriptions.size.times do current select_next_consumer end end |