Class: Racecar::ConsumerSet

Inherits:
Object
  • Object
show all
Defined in:
lib/racecar/consumer_set.rb

Constant Summary collapse

MAX_POLL_TRIES =
10

Instance Method Summary collapse

Constructor Details

#initialize(config, logger, instrumenter = NullInstrumenter) ⇒ ConsumerSet

Returns a new instance of ConsumerSet.

Raises:

  • (ArgumentError)


7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/racecar/consumer_set.rb', line 7

def initialize(config, logger, instrumenter = NullInstrumenter)
  @config, @logger = config, logger
  @instrumenter = instrumenter
  raise ArgumentError, "Subscriptions must not be empty when subscribing" if @config.subscriptions.empty?

  @consumers = []
  @consumer_id_iterator = (0...@config.subscriptions.size).cycle

  @previous_retries = 0

  @last_poll_read_nil_message = false
end

Instance Method Details

#batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages) ⇒ Object

batch_poll collects messages until any of the following occurs:

  • max_wait_time_ms time has passed

  • max_messages have been collected

  • a nil message was polled (end of topic, Kafka stalled, etc.)

The messages are from a single topic, but potentially from more than one partition.

Any errors during polling are retried in an exponential backoff fashion. If an error occurs, but there is no time left for a backoff and retry, it will return the already collected messages and only retry on the next call.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/racecar/consumer_set.rb', line 34

def batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages)
  started_at = Time.now
  remain_ms = max_wait_time_ms
  maybe_select_next_consumer
  messages = []

  while remain_ms > 0 && messages.size < max_messages
    remain_ms = remaining_time_ms(max_wait_time_ms, started_at)
    msg = poll_with_retries(remain_ms)
    break if msg.nil?
    messages << msg
  end

  messages
end

#closeObject



66
67
68
# File 'lib/racecar/consumer_set.rb', line 66

def close
  each_subscribed(&:close)
end

#commitObject



60
61
62
63
64
# File 'lib/racecar/consumer_set.rb', line 60

def commit
  each_subscribed do |consumer|
    commit_rescue_no_offset(consumer)
  end
end

#currentObject



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/racecar/consumer_set.rb', line 70

def current
  @consumers[@consumer_id_iterator.peek] ||= begin
    consumer_config = Rdkafka::Config.new(rdkafka_config(current_subscription))
    consumer_config.consumer_rebalance_listener = @config.rebalance_listener

    consumer = consumer_config.consumer
    @instrumenter.instrument('join_group') do
      consumer.subscribe current_subscription.topic
    end
    consumer
  end
end

#each_subscribedObject Also known as: each



83
84
85
86
87
88
89
# File 'lib/racecar/consumer_set.rb', line 83

def each_subscribed
  if block_given?
    @consumers.each { |c| yield c }
  else
    @consumers.each
  end
end

#pause(topic, partition, offset) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
# File 'lib/racecar/consumer_set.rb', line 91

def pause(topic, partition, offset)
  consumer, filtered_tpl = find_consumer_by(topic, partition)
  if !consumer
    @logger.info "Attempted to pause #{topic}/#{partition}, but we're not subscribed to it"
    return
  end

  consumer.pause(filtered_tpl)
  fake_msg = OpenStruct.new(topic: topic, partition: partition, offset: offset)
  consumer.seek(fake_msg)
end

#poll(max_wait_time_ms = @config.max_wait_time_ms) ⇒ Object



20
21
22
# File 'lib/racecar/consumer_set.rb', line 20

def poll(max_wait_time_ms = @config.max_wait_time_ms)
  batch_poll(max_wait_time_ms, 1).first
end

#resume(topic, partition) ⇒ Object



103
104
105
106
107
108
109
110
111
# File 'lib/racecar/consumer_set.rb', line 103

def resume(topic, partition)
  consumer, filtered_tpl = find_consumer_by(topic, partition)
  if !consumer
    @logger.info "Attempted to resume #{topic}/#{partition}, but we're not subscribed to it"
    return
  end

  consumer.resume(filtered_tpl)
end

#store_offset(message) ⇒ Object



50
51
52
53
54
55
56
57
58
# File 'lib/racecar/consumer_set.rb', line 50

def store_offset(message)
  current.store_offset(message)
rescue Rdkafka::RdkafkaError => e
  if e.code == :state # -172
    @logger.warn "Attempted to store_offset, but we're not subscribed to it: #{ErroneousStateError.new(e)}"
    return
  end
  raise e
end

#subscribe_allObject

Subscribe to all topics eagerly, even if there’s still messages elsewhere. Usually that’s not needed and Kafka might rebalance if topics are not polled frequently enough.



118
119
120
121
122
123
# File 'lib/racecar/consumer_set.rb', line 118

def subscribe_all
  @config.subscriptions.size.times do
    current
    select_next_consumer
  end
end