Method: Cosmos::Store#update_topic_offsets
- Defined in:
- lib/cosmos/utilities/store_autoload.rb
#update_topic_offsets(topics) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 139 def update_topic_offsets(topics) offsets = [] topics.each do |topic| # Normally we will just be grabbing the topic offset # this allows xread to get everything past this point last_id = @topic_offsets[topic] if last_id offsets << last_id else # If there is no topic offset this is the first call. # Get the last offset ID so we'll start getting everything from now on offsets << get_last_offset(topic) @topic_offsets[topic] = offsets[-1] end end return offsets end |