Class: HeavyKeeper::TopK

Inherits:
Object
  • Object
show all
Defined in:
lib/heavy_keeper/top_k.rb

Overview

rubocop:disable Metrics/ClassLength

Constant Summary collapse

Validator =
::Dry::Schema.Params do
  required(:top_k).filled(:integer, gt?: 0)
  required(:width).filled(:integer, gt?: 0)
  required(:depth).filled(:integer, gt?: 0)
  required(:decay).filled(:decimal, gt?: 0, lteq?: 1)
end

Instance Method Summary collapse

Constructor Details

#initialize(storage: HeavyKeeper.config.storage) ⇒ HeavyKeeper::TopK

Initiate the controller to create/operate on top-k DS



22
23
24
25
26
# File 'lib/heavy_keeper/top_k.rb', line 22

def initialize(storage: HeavyKeeper.config.storage)
  @storage = storage
  @min_heap = MinHeap.new(storage)
  @bucket = Bucket.new(storage)
end

Instance Method Details

#add(key, *items) ⇒ Array[Nil, Integer]

Complexity O(k + depth) Add an array of items to a Top-K DS



59
60
61
62
# File 'lib/heavy_keeper/top_k.rb', line 59

def add(key, *items)
  items_and_increments = items.map { |item| [item, 1] }
  increase_by(key, *items_and_increments)
end

#clear(key) ⇒ Object

Complexity O(1) Clean up all Redis data related to a key



175
176
177
178
179
180
181
# File 'lib/heavy_keeper/top_k.rb', line 175

def clear(key)
  storage.multi do
    storage.del((key))
    min_heap.clear(key)
    bucket.clear(key)
  end
end

#count(key, *items) ⇒ Array[Integer]

Complexity O(k + depth) Please note this number will never be higher than the real count and likely to be lower. Multiple items can be queried at once.



151
152
153
154
155
# File 'lib/heavy_keeper/top_k.rb', line 151

def count(key, *items)
  items.map do |item|
    min_heap.count(key, item)
  end
end

#increase_by(key, *items_and_increments) ⇒ Array[Nil, String]

Complexity O(k + (increment * depth)) Add an array of items to a Top-K DS, with custom increment for each item

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength rubocop:disable Metrics/BlockLength rubocop:disable Metrics/PerceivedComplexity



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/heavy_keeper/top_k.rb', line 79

def increase_by(key, *items_and_increments)
  options = validate(storage.hgetall((key)))

  items_and_increments.map do |(item, increment)|
    max_count = 0
    item_fingerprint = XXhash.xxh64(item)

    exist = min_heap.exist?(key, item)
    min_value = min_heap.min(key)

    options[:depth].times do |i|
      bucket_number = XXhash.xxh64_stream(StringIO.new(item), i) % options[:width]

      fingerprint, count = bucket.get(key, i, bucket_number)

      if count.nil? || count.zero?
        bucket.set(key, i, bucket_number, [item_fingerprint, increment])
        max_count = [increment, max_count].max
      elsif fingerprint == item_fingerprint
        if exist || count <= min_value
          bucket.set(key, i, bucket_number, [fingerprint, count + increment])
          max_count = [count + increment, max_count].max
        end
      else
        decay = options[:decay]**count

        if SecureRandom.rand < decay
          count -= increment

          if count.positive?
            bucket.set(key, i, bucket_number, [fingerprint, count])
          else
            bucket.set(key, i, bucket_number, [item_fingerprint, increment])
            max_count = [increment, max_count].max
          end
        end
      end
    end

    if exist
      min_heap.update(key, item, max_count)
    else
      min_heap.add(key, item, max_count, options[:top_k])
    end
  end
end

#list(key) ⇒ Hash

Complexity O(k) Return full list of items in Top K list.



164
165
166
167
# File 'lib/heavy_keeper/top_k.rb', line 164

def list(key)
  top_k = storage.hget((key), :top_k).to_i
  min_heap.list(key, top_k)
end

#query(key, *items) ⇒ Array[Boolean]

Complexity O(k) Checks whether an item is one of Top-K items. Multiple items can be checked at once.



137
138
139
140
141
# File 'lib/heavy_keeper/top_k.rb', line 137

def query(key, *items)
  items.map do |item|
    min_heap.exist?(key, item)
  end
end

#remove(key, item) ⇒ Object

Complexity O(1) Reset counter of an item to zero in order to decay it out



190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/heavy_keeper/top_k.rb', line 190

def remove(key, item)
  options = validate(storage.hgetall((key)))
  item_fingerprint = XXhash.xxh64(item)

  options[:depth].times do |i|
    bucket_number = XXhash.xxh64_stream(StringIO.new(item), i) % options[:width]
    fingerprint, _ = bucket.get(key, i, bucket_number)

    bucket.set(key, i, bucket_number, [fingerprint, 0]) if item_fingerprint == fingerprint
  end

  min_heap.delete(key, item)
end

#reserve(key, options) ⇒ Object

Complexity O(1) Initialize a TopK in Redis with specified parameters.



39
40
41
42
43
44
45
46
47
# File 'lib/heavy_keeper/top_k.rb', line 39

def reserve(key, options)
  options = validate(options)
  # We need to use Decimal when validating to accept integer
  # number. However, we need to convert to float when save to Redis
  # because redis-rb 5 don't support Decimal type
  options[:decay] = options[:decay].to_f

  storage.mapped_hmset((key), options)
end