Method: Cosmos::Store#update_topic_offsets

Defined in:
lib/cosmos/utilities/store_autoload.rb

#update_topic_offsets(topics) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/cosmos/utilities/store_autoload.rb', line 139

def update_topic_offsets(topics)
  offsets = []
  topics.each do |topic|
    # Normally we will just be grabbing the topic offset
    # this allows xread to get everything past this point
    last_id = @topic_offsets[topic]
    if last_id
      offsets << last_id
    else
      # If there is no topic offset this is the first call.
      # Get the last offset ID so we'll start getting everything from now on
      offsets << get_last_offset(topic)
      @topic_offsets[topic] = offsets[-1]
    end
  end
  return offsets
end