Class: OpenC3::Store

Inherits:
Object show all
Defined in:
lib/openc3/utilities/store_autoload.rb

Direct Known Subclasses

EphemeralStore

Constant Summary collapse

@@instance_mutex =

Mutex used to ensure that only one instance is created

Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pool_size = 10) ⇒ Store

Returns a new instance of Store.



93
94
95
96
97
98
# File 'lib/openc3/utilities/store_autoload.rb', line 93

def initialize(pool_size = 10)
  @redis_username = ENV['OPENC3_REDIS_USERNAME']
  @redis_key = ENV['OPENC3_REDIS_PASSWORD']
  @redis_url = "redis://#{ENV['OPENC3_REDIS_HOSTNAME']}:#{ENV['OPENC3_REDIS_PORT']}"
  @redis_pool = StoreConnectionPool.new(size: pool_size) { build_redis() }
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(message, *args, **kwargs, &block) ⇒ Object

Delegate all unknown methods to redis through the @redis_pool



89
90
91
# File 'lib/openc3/utilities/store_autoload.rb', line 89

def method_missing(message, *args, **kwargs, &block)
  @redis_pool.with { |redis| redis.public_send(message, *args, **kwargs, &block) }
end

Instance Attribute Details

#redis_poolObject (readonly)

Returns the value of attribute redis_pool.



70
71
72
# File 'lib/openc3/utilities/store_autoload.rb', line 70

def redis_pool
  @redis_pool
end

#redis_urlObject (readonly)

Returns the value of attribute redis_url.



69
70
71
# File 'lib/openc3/utilities/store_autoload.rb', line 69

def redis_url
  @redis_url
end

Class Method Details

.instance(pool_size = 100) ⇒ Object

Get the singleton instance



73
74
75
76
77
78
79
80
81
# File 'lib/openc3/utilities/store_autoload.rb', line 73

def self.instance(pool_size = 100)
  # Logger.level = Logger::DEBUG
  return @instance if @instance

  @@instance_mutex.synchronize do
    @instance ||= self.new(pool_size)
    return @instance
  end
end

.method_missing(message, *args, **kwargs, &block) ⇒ Object

Delegate all unknown class methods to delegate to the instance



84
85
86
# File 'lib/openc3/utilities/store_autoload.rb', line 84

def self.method_missing(message, *args, **kwargs, &block)
  self.instance.public_send(message, *args, **kwargs, &block)
end

Instance Method Details

#build_redisObject



101
102
103
# File 'lib/openc3/utilities/store_autoload.rb', line 101

def build_redis
  return Redis.new(url: @redis_url, username: @redis_username, password: @redis_key)
end

#get_last_offset(topic) ⇒ Object



135
136
137
138
139
140
141
142
143
144
# File 'lib/openc3/utilities/store_autoload.rb', line 135

def get_last_offset(topic)
  @redis_pool.with do |redis|
    result = redis.xrevrange(topic, count: 1)
    if result and result[0] and result[0][0]
      result[0][0]
    else
      "0-0"
    end
  end
end

#get_newest_message(topic) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/openc3/utilities/store_autoload.rb', line 121

def get_newest_message(topic)
  @redis_pool.with do |redis|
    # Default in xrevrange is range end '+', start '-' which means get all
    # elements from higher ID to lower ID and since we're limiting to 1
    # we get the last element. See https://redis.io/commands/xrevrange.
    result = redis.xrevrange(topic, count: 1)
    if result and result.length > 0
      return result[0]
    else
      return nil
    end
  end
end

#get_oldest_message(topic) ⇒ Object

Stream APIs



110
111
112
113
114
115
116
117
118
119
# File 'lib/openc3/utilities/store_autoload.rb', line 110

def get_oldest_message(topic)
  @redis_pool.with do |redis|
    result = redis.xrange(topic, count: 1)
    if result and result.length > 0
      return result[0]
    else
      return nil
    end
  end
end

#read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/openc3/utilities/store_autoload.rb', line 167

def read_topics(topics, offsets = nil, timeout_ms = 1000, count = nil)
  return {} if topics.empty?
  Thread.current[:topic_offsets] ||= {}
  topic_offsets = Thread.current[:topic_offsets]
  begin
    # Logger.debug "read_topics: #{topics}, #{offsets} pool:#{@redis_pool}"
    @redis_pool.with do |redis|
      offsets = update_topic_offsets(topics) unless offsets
      result = redis.xread(topics, offsets, block: timeout_ms, count: count)
      if result and result.length > 0
        result.each do |topic, messages|
          messages.each do |msg_id, msg_hash|
            topic_offsets[topic] = msg_id
            yield topic, msg_id, msg_hash, redis if block_given?
          end
        end
      end
      # Logger.debug "result:#{result}" if result and result.length > 0
      return result
    end
  rescue Redis::TimeoutError
    return {} # Should return an empty hash not array - xread returns a hash
  end
end

#trim_topic(topic, minid, approximate = true, limit: 0) ⇒ Integer

Trims older entries of the redis stream if needed. > www.rubydoc.info/github/redis/redis-rb/Redis:xtrim

Examples:

Without options

store.trim_topic('MANGO__TOPIC', 1000)

With options

store.trim_topic('MANGO__TOPIC', 1000, approximate: 'true', limit: 0)

Parameters:

  • topic (String)

    the stream key

  • minid (Integer)

    Id to throw away data up to

  • approximate (Boolean) (defaults to: true)

    whether to add '~` modifier of maxlen or not

  • limit (Boolean) (defaults to: 0)

    number of items to return from the call

Returns:

  • (Integer)

    the number of entries actually deleted



231
232
233
234
235
# File 'lib/openc3/utilities/store_autoload.rb', line 231

def trim_topic(topic, minid, approximate = true, limit: 0)
  @redis_pool.with do |redis|
    return redis.xtrim_minid(topic, minid, approximate: approximate, limit: limit)
  end
end

#update_topic_offsets(topics) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/openc3/utilities/store_autoload.rb', line 146

def update_topic_offsets(topics)
  offsets = []
  topics.each do |topic|
    # Normally we will just be grabbing the topic offset
    # this allows xread to get everything past this point
    Thread.current[:topic_offsets] ||= {}
    topic_offsets = Thread.current[:topic_offsets]
    last_id = topic_offsets[topic]
    if last_id
      offsets << last_id
    else
      # If there is no topic offset this is the first call.
      # Get the last offset ID so we'll start getting everything from now on
      offsets << get_last_offset(topic)
      topic_offsets[topic] = offsets[-1]
    end
  end
  return offsets
end

#write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = 'true') ⇒ String

Add new entry to the redis stream. > www.rubydoc.info/github/redis/redis-rb/Redis:xadd

Examples:

Without options

store.write_topic('MANGO__TOPIC', {'message' => 'something'})

With options

store.write_topic('MANGO__TOPIC', {'message' => 'something'}, id: '0-0', maxlen: 1000, approximate: 'true')

Parameters:

  • topic (String)

    the stream / topic

  • msg_hash (Hash)

    one or multiple field-value pairs

  • opts (Hash)

    a customizable set of options

Returns:



210
211
212
213
214
215
# File 'lib/openc3/utilities/store_autoload.rb', line 210

def write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = 'true')
  id = '*' if id.nil?
  @redis_pool.with do |redis|
    return redis.xadd(topic, msg_hash, id: id, maxlen: maxlen, approximate: approximate)
  end
end