Method: OpenC3::Store#read_topics

Defined in:
lib/openc3/utilities/store_autoload.rb

#read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/openc3/utilities/store_autoload.rb', line 161

def read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil)
  Thread.current[:topic_offsets] ||= {}
  topic_offsets = Thread.current[:topic_offsets]
  begin
    # Logger.debug "read_topics: #{topics}, #{offsets} pool:#{@redis_pool}"
    @redis_pool.with do |redis|
      offsets = update_topic_offsets(topics) unless offsets
      result = redis.xread(topics, offsets, block: timeout_ms, count: count)
      if result and result.length > 0
        result.each do |topic, messages|
          messages.each do |msg_id, msg_hash|
            topic_offsets[topic] = msg_id
            yield topic, msg_id, msg_hash, redis if block_given?
          end
        end
      end
      # Logger.debug "result:#{result}" if result and result.length > 0
      return result
    end
  rescue Redis::TimeoutError
    return {} # Should return an empty hash not array - xread returns a hash
  end
end