161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
# File 'lib/openc3/utilities/store_autoload.rb', line 161
def read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil)
Thread.current[:topic_offsets] ||= {}
topic_offsets = Thread.current[:topic_offsets]
begin
@redis_pool.with do |redis|
offsets = update_topic_offsets(topics) unless offsets
result = redis.xread(topics, offsets, block: timeout_ms, count: count)
if result and result.length > 0
result.each do |topic, messages|
messages.each do |msg_id, msg_hash|
topic_offsets[topic] = msg_id
yield topic, msg_id, msg_hash, redis if block_given?
end
end
end
return result
end
rescue Redis::TimeoutError
return {} end
end
|