Class: Kafka::OffsetManager

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/offset_manager.rb

Overview

Manages a consumer's position in partitions, figures out where to resume processing from, etc.

Constant Summary collapse

DEFAULT_RETENTION_TIME =

The default broker setting for offsets.retention.minutes is 1440.

1440 * 60

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, group:, fetcher:, logger:, commit_interval:, commit_threshold:, offset_retention_time:) ⇒ OffsetManager

Returns a new instance of OffsetManager.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/kafka/offset_manager.rb', line 12

def initialize(cluster:, group:, fetcher:, logger:, commit_interval:, commit_threshold:, offset_retention_time:)
  @cluster = cluster
  @group = group
  @fetcher = fetcher
  @logger = TaggedLogger.new(logger)
  @commit_interval = commit_interval
  @commit_threshold = commit_threshold

  @uncommitted_offsets = 0
  @processed_offsets = {}
  @default_offsets = {}
  @committed_offsets = nil
  @resolved_offsets = {}
  @last_commit = Time.now
  @last_recommit = nil
  @recommit_interval = (offset_retention_time || DEFAULT_RETENTION_TIME) / 2
end

Instance Method Details

#clear_offsetsnil

Clear all stored offset information.

Returns:

  • (nil)


160
161
162
163
164
165
166
# File 'lib/kafka/offset_manager.rb', line 160

def clear_offsets
  @processed_offsets.clear
  @resolved_offsets.clear

  # Clear the cached commits from the brokers.
  @committed_offsets = nil
end

#clear_offsets_excluding(excluded) ⇒ nil

Clear stored offset information for all partitions except those specified in excluded.

offset_manager.clear_offsets_excluding("my-topic" => [1, 2, 3])

Returns:

  • (nil)


174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/kafka/offset_manager.rb', line 174

def clear_offsets_excluding(excluded)
  # Clear all offsets that aren't in `excluded`.
  @processed_offsets.each do |topic, partitions|
    partitions.keep_if do |partition, _|
      excluded.fetch(topic, []).include?(partition)
    end
  end

  # Clear the cached commits from the brokers.
  @committed_offsets = nil
  @resolved_offsets.clear
end

#commit_offsets(recommit = false) ⇒ nil

Commit offsets of messages that have been marked as processed.

If recommit is set to true, we will also commit the existing positions even if no messages have been processed on a partition. This is done in order to avoid the offset information expiring in cases where messages are very rare -- it's essentially a keep-alive.

Parameters:

  • recommit (Boolean) (defaults to: false)

    whether to recommit offsets that have already been committed.

Returns:

  • (nil)


131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/kafka/offset_manager.rb', line 131

def commit_offsets(recommit = false)
  offsets = offsets_to_commit(recommit)
  unless offsets.empty?
    @logger.debug "Committing offsets#{recommit ? ' with recommit' : ''}: #{prettify_offsets(offsets)}"

    @group.commit_offsets(offsets)

    @last_commit = Time.now
    @last_recommit = Time.now if recommit

    @uncommitted_offsets = 0
    @committed_offsets = nil
  end
end

#commit_offsets_if_necessarynil

Commit offsets if necessary, according to the offset commit policy specified when initializing the class.

Returns:

  • (nil)


150
151
152
153
154
155
# File 'lib/kafka/offset_manager.rb', line 150

def commit_offsets_if_necessary
  recommit = recommit_timeout_reached?
  if recommit || commit_timeout_reached? || commit_threshold_reached?
    commit_offsets(recommit)
  end
end

#mark_as_processed(topic, partition, offset) ⇒ nil

Mark a message as having been processed.

When offsets are committed, the message's offset will be stored in Kafka so that we can resume from this point at a later time.

Parameters:

  • topic (String)

    the name of the topic.

  • partition (Integer)

    the partition number.

  • offset (Integer)

    the offset of the message that should be marked as processed.

Returns:

  • (nil)


52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/kafka/offset_manager.rb', line 52

def mark_as_processed(topic, partition, offset)
  unless @group.assigned_to?(topic, partition)
    @logger.debug "Not marking #{topic}/#{partition}:#{offset} as processed for partition not assigned to this consumer."
    return
  end
  @processed_offsets[topic] ||= {}

  last_processed_offset = @processed_offsets[topic][partition] || -1
  if last_processed_offset > offset + 1
    @logger.debug "Not overwriting newer offset #{topic}/#{partition}:#{last_processed_offset - 1} with older #{offset}"
    return
  end

  @uncommitted_offsets += 1

  # The committed offset should always be the offset of the next message that the
  # application will read, thus adding one to the last message processed.
  @processed_offsets[topic][partition] = offset + 1
  @logger.debug "Marking #{topic}/#{partition}:#{offset} as processed"
end

#next_offset_for(topic, partition) ⇒ Integer

Return the next offset that should be fetched for the specified partition.

Parameters:

  • topic (String)

    the name of the topic.

  • partition (Integer)

    the partition number.

Returns:

  • (Integer)

    the next offset that should be fetched.



106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/kafka/offset_manager.rb', line 106

def next_offset_for(topic, partition)
  offset = @processed_offsets.fetch(topic, {}).fetch(partition) {
    committed_offset_for(topic, partition)
  }

  # A negative offset means that no offset has been committed, so we need to
  # resolve the default offset for the topic.
  if offset < 0
    resolve_offset(topic, partition)
  else
    # The next offset is the last offset.
    offset
  end
end

#seek_to(topic, partition, offset) ⇒ nil

Move the consumer's position in the partition to the specified offset.

Parameters:

  • topic (String)

    the name of the topic.

  • partition (Integer)

    the partition number.

  • offset (Integer)

    the offset that the consumer position should be moved to.

Returns:

  • (nil)


94
95
96
97
98
99
# File 'lib/kafka/offset_manager.rb', line 94

def seek_to(topic, partition, offset)
  @processed_offsets[topic] ||= {}
  @processed_offsets[topic][partition] = offset

  @fetcher.seek(topic, partition, offset)
end

#seek_to_default(topic, partition) ⇒ nil

Move the consumer's position in the partition back to the configured default offset, either the first or latest in the partition.

Parameters:

  • topic (String)

    the name of the topic.

  • partition (Integer)

    the partition number.

Returns:

  • (nil)


79
80
81
82
83
84
85
86
# File 'lib/kafka/offset_manager.rb', line 79

def seek_to_default(topic, partition)
  # Remove any cached offset, in case things have changed broker-side.
  clear_resolved_offset(topic)

  offset = resolve_offset(topic, partition)

  seek_to(topic, partition, offset)
end

#set_default_offset(topic, default_offset) ⇒ nil

Set the default offset for a topic.

When the consumer is started for the first time, or in cases where it gets stuck and has to reset its position, it must start either with the earliest messages or with the latest, skipping to the very end of each partition.

Parameters:

  • topic (String)

    the name of the topic.

  • default_offset (Symbol)

    either :earliest or :latest.

Returns:

  • (nil)


39
40
41
# File 'lib/kafka/offset_manager.rb', line 39

def set_default_offset(topic, default_offset)
  @default_offsets[topic] = default_offset
end